Go routine 编排框架:oklog/run 包
目录
对于每一个 Go routine 组件,我们都有相应的办法来执行结束操作。状态机通过 Context 对象,HTTP 服务器通过调用 Listener 的 Close 方法,定时任务和监听器通过 channel。当一个组件结束的时候,需要通知其他组件有序执行结束操作。这个问题的解决方法可以用 Actor 模型来描述。每个 Go routine 都是一个 actor,互相独立,互相之间只能通过 message 通信。oklog/run 包实现了 Actor 模型,能非常简洁的实现 Go routine 编排功能。
2. oklog/run 包介绍
oklog/run 包非常简单,只有一个类型,两个方法,共 60 行代码。其中 Group 是一组 actor,通过调用 Add 方法将 actor 添加到 Group 中。
type Group func (g *Group) Add(execute func() error, interrupt func(error)) func (g *Group) Run() error
type Group struct { actors []actor } func (g *Group) Add(execute func() error, interrupt func(error)) { g.actors = append(g.actors, actor{execute, interrupt}) }
每个 actor 有两个方法:execute 和 interrupt。execute 完成 Go routine 的计算任务,interrupt 结束 Go routine 并退出。
type actor struct { execute func() error interrupt func(error) }
调用 Run 方法后会启动所有 Go routine(或者称为 actor),并等待第一个结束的 Go routine(无论正常退出或因为异常终止)。一旦捕获到第一个结束信号,会依次结束其他 Go routine 直到所有 Go routine 完全退出。
func (g *Group) Run() error { if len(g.actors) == 0 { return nil } // Run each actor. errors := make(chan error, len(g.actors)) for _, a := range g.actors { go func(a actor) { errors <- a.execute() }(a) } // Wait for the first actor to stop. err := <-errors // Signal all actors to stop. for _, a := range g.actors { a.interrupt(err) } // Wait for all actors to stop. for i := 1; i < cap(errors); i++ { <-errors } // Return the original error. return err }
3. 使用例子
下面例子定义了三个 actor,前两个 actor 一直等待。第三个 actor 在 3s 后结束退出。引起前两个 actor 退出。
package main import ( "fmt" "github.com/oklog/run" "time" ) func main() { g := run.Group{} { cancel := make(chan struct{}) g.Add( func() error { select { case <- cancel: fmt.Println("Go routine 1 is closed") break } return nil }, func(error) { close(cancel) }, ) } { cancel := make(chan