前言
同步適合多個(gè)連續(xù)執(zhí)行的,每一步的執(zhí)行依賴于上一步操作,異步執(zhí)行則和任務(wù)執(zhí)行順序無(wú)關(guān)(如從10個(gè)站點(diǎn)抓取數(shù)據(jù))
同步執(zhí)行類RunnerAsync
支持返回超時(shí)檢測(cè),系統(tǒng)中斷檢測(cè)
錯(cuò)誤常量定義
//超時(shí)錯(cuò)誤
var ErrTimeout = errors.New("received timeout")
//操作系統(tǒng)系統(tǒng)中斷錯(cuò)誤
var ErrInterrupt = errors.New("received interrupt")
實(shí)現(xiàn)代碼如下
package task
import (
"os"
"time"
"os/signal"
"sync"
)
//異步執(zhí)行任務(wù)
type Runner struct {
//操作系統(tǒng)的信號(hào)檢測(cè)
interrupt chan os.Signal
//記錄執(zhí)行完成的狀態(tài)
complete chan error
//超時(shí)檢測(cè)
timeout -chan time.Time
//保存所有要執(zhí)行的任務(wù),順序執(zhí)行
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new一個(gè)Runner對(duì)象
func NewRunner(d time.Duration) *Runner {
return Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
//添加一個(gè)任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
//啟動(dòng)Runner,監(jiān)聽(tīng)錯(cuò)誤信息
func (this *Runner) Start() error {
//接收操作系統(tǒng)信號(hào)
signal.Notify(this.interrupt, os.Interrupt)
//并發(fā)執(zhí)行任務(wù)
go func() {
this.complete - this.Run()
}()
select {
//返回執(zhí)行結(jié)果
case err := -this.complete:
return err
//超時(shí)返回
case -this.timeout:
return ErrTimeout
}
}
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
//執(zhí)行任務(wù)
err := task(id)
//加鎖保存到結(jié)果集中
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
//判斷是否接收到操作系統(tǒng)中斷信號(hào)
func (this *Runner) gotInterrupt() bool {
select {
case -this.interrupt:
//停止接收別的信號(hào)
signal.Stop(this.interrupt)
return true
//正常執(zhí)行
default:
return false
}
}
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
return this.errs
}
使用方法
Add添加一個(gè)任務(wù),任務(wù)為接收int類型的一個(gè)閉包
Start開(kāi)始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)
測(cè)試示例代碼
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunnerAsync_Start(t *testing.T) {
//開(kāi)啟多核
runtime.GOMAXPROCS(runtime.NumCPU())
//創(chuàng)建runner對(duì)象,設(shè)置超時(shí)時(shí)間
runner := NewRunnerAsync(8 * time.Second)
//添加運(yùn)行的任務(wù)
runner.Add(
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
)
fmt.Println("同步執(zhí)行任務(wù)")
//開(kāi)始執(zhí)行任務(wù)
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println("執(zhí)行超時(shí)")
os.Exit(1)
case ErrInterrupt:
fmt.Println("任務(wù)被中斷")
os.Exit(2)
}
}
t.Log("執(zhí)行結(jié)束")
}
//創(chuàng)建要執(zhí)行的任務(wù)
func createTaskAsync() func(id int) {
return func(id int) {
fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id)
//模擬任務(wù)執(zhí)行,sleep兩秒
//time.Sleep(1 * time.Second)
}
}
執(zhí)行結(jié)果
同步執(zhí)行任務(wù)
正在執(zhí)行0個(gè)任務(wù)
正在執(zhí)行1個(gè)任務(wù)
正在執(zhí)行2個(gè)任務(wù)
正在執(zhí)行3個(gè)任務(wù)
正在執(zhí)行4個(gè)任務(wù)
正在執(zhí)行5個(gè)任務(wù)
正在執(zhí)行6個(gè)任務(wù)
正在執(zhí)行7個(gè)任務(wù)
正在執(zhí)行8個(gè)任務(wù)
正在執(zhí)行9個(gè)任務(wù)
正在執(zhí)行10個(gè)任務(wù)
正在執(zhí)行11個(gè)任務(wù)
正在執(zhí)行12個(gè)任務(wù)
runnerAsync_test.go:49: 執(zhí)行結(jié)束
異步執(zhí)行類Runner
支持返回超時(shí)檢測(cè),系統(tǒng)中斷檢測(cè)
實(shí)現(xiàn)代碼如下
package task
import (
"os"
"time"
"os/signal"
"sync"
)
//異步執(zhí)行任務(wù)
type Runner struct {
//操作系統(tǒng)的信號(hào)檢測(cè)
interrupt chan os.Signal
//記錄執(zhí)行完成的狀態(tài)
complete chan error
//超時(shí)檢測(cè)
timeout -chan time.Time
//保存所有要執(zhí)行的任務(wù),順序執(zhí)行
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new一個(gè)Runner對(duì)象
func NewRunner(d time.Duration) *Runner {
return Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
//添加一個(gè)任務(wù)
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
//啟動(dòng)Runner,監(jiān)聽(tīng)錯(cuò)誤信息
func (this *Runner) Start() error {
//接收操作系統(tǒng)信號(hào)
signal.Notify(this.interrupt, os.Interrupt)
//并發(fā)執(zhí)行任務(wù)
go func() {
this.complete - this.Run()
}()
select {
//返回執(zhí)行結(jié)果
case err := -this.complete:
return err
//超時(shí)返回
case -this.timeout:
return ErrTimeout
}
}
//異步執(zhí)行所有的任務(wù)
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
//執(zhí)行任務(wù)
err := task(id)
//加鎖保存到結(jié)果集中
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
//判斷是否接收到操作系統(tǒng)中斷信號(hào)
func (this *Runner) gotInterrupt() bool {
select {
case -this.interrupt:
//停止接收別的信號(hào)
signal.Stop(this.interrupt)
return true
//正常執(zhí)行
default:
return false
}
}
//獲取執(zhí)行完的error
func (this *Runner) GetErrs() []error {
return this.errs
}
使用方法
Add添加一個(gè)任務(wù),任務(wù)為接收int類型,返回類型error的一個(gè)閉包
Start開(kāi)始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)
getErrs獲取所有的任務(wù)執(zhí)行結(jié)果
測(cè)試示例代碼
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunner_Start(t *testing.T) {
//開(kāi)啟多核心
runtime.GOMAXPROCS(runtime.NumCPU())
//創(chuàng)建runner對(duì)象,設(shè)置超時(shí)時(shí)間
runner := NewRunner(18 * time.Second)
//添加運(yùn)行的任務(wù)
runner.Add(
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
)
fmt.Println("異步執(zhí)行任務(wù)")
//開(kāi)始執(zhí)行任務(wù)
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println("執(zhí)行超時(shí)")
os.Exit(1)
case ErrInterrupt:
fmt.Println("任務(wù)被中斷")
os.Exit(2)
}
}
t.Log("執(zhí)行結(jié)束")
t.Log(runner.GetErrs())
}
//創(chuàng)建要執(zhí)行的任務(wù)
func createTask() func(id int) error {
return func(id int) error {
fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id)
//模擬任務(wù)執(zhí)行,sleep
//time.Sleep(1 * time.Second)
return nil
}
}
執(zhí)行結(jié)果
異步執(zhí)行任務(wù)
正在執(zhí)行2個(gè)任務(wù)
正在執(zhí)行1個(gè)任務(wù)
正在執(zhí)行4個(gè)任務(wù)
正在執(zhí)行3個(gè)任務(wù)
正在執(zhí)行6個(gè)任務(wù)
正在執(zhí)行5個(gè)任務(wù)
正在執(zhí)行9個(gè)任務(wù)
正在執(zhí)行7個(gè)任務(wù)
正在執(zhí)行10個(gè)任務(wù)
正在執(zhí)行13個(gè)任務(wù)
正在執(zhí)行8個(gè)任務(wù)
正在執(zhí)行11個(gè)任務(wù)
正在執(zhí)行12個(gè)任務(wù)
正在執(zhí)行0個(gè)任務(wù)
runner_test.go:49: 執(zhí)行結(jié)束
runner_test.go:51: [nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil> nil>]
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。