1. goroutine源码分析
1.1 初始化
go程序的启动流程分为四步
call osinit, 这里就是设置了全局变量ncpu = cpu核心数量
call schedinit
make & queue new G (runtime.newproc, go func()也是调用这个函数来创建goroutine)
call runtime·mstart
其中,schedinit 就是调度器的初始化,除了schedinit 中对内存分配,垃圾回收等操作,针对调度器的初始化大致就是初始化自身,设置最大的maxmcount, 确定p的数量并初始化这些操作。
schedinit
schedinit这里对当前m进行了初始化,并根据osinit获取到的CPU核数和设置的GOMAXPROCS确定P的数量,并进行初始化。
复制代码
1 func schedinit() {
2 // 从TLS或者专用寄存器获取当前g的指针类型
3 _g_ := getg()
4 // 设置m最大的数量
5 sched.maxmcount = 10000
6
7 // 初始化栈的复用空间
8 stackinit()
9 // 初始化当前m
10 mcommoninit(_g_.m)
11
12 // osinit的时候会设置 ncpu这个全局变量,这里就是根据cpu核心数和参数GOMAXPROCS来确定p的数量
13 procs := ncpu
14 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
15 procs = n
16 }
17 // 生成设定数量的p
18 if procresize(procs) != nil {
19 throw("unknown runnable goroutine during bootstrap")
20 }
21 }
复制代码
初始化当前M时调用了 mcommoninit() 函数,再看下这个函数的实现
mcommoninit
复制代码
1 func mcommoninit(mp *m) {
2 _g_ := getg()
3
4 lock(&sched.lock)
5 // 判断mnext的值是否溢出,mnext需要赋值给m.id
6 if sched.mnext+1 < sched.mnext {
7 throw("runtime: thread ID overflow")
8 }
9 mp.id = sched.mnext
10 sched.mnext++
11 // 判断m的数量是否比maxmcount设定的要多,如果超出直接报异常
12 checkmcount()
13 // 创建一个新的g用于处理signal,并分配栈
14 mpreinit(mp)
15 if mp.gsignal != nil {
16 mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
17 }
18
19 //添加到allm,以便垃圾收集器不会释放g-> m
20 //仅在寄存器或线程本地存储中时。
21
22 // 接下来的两行,首先将当前m放到allm的头,然后原子操作,将当前m的地址,赋值给m,这样就将当前m添加到了allm链表的头了
23 mp.alllink = allm
24
25 // NumCgoCall()遍历不带schedlock的allm,
26 //,因此我们需要安全地发布它。
27 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
28 unlock(&sched.lock)
29
30 //如果cgo调用崩溃,则分配内存以保留cgo追溯。
31 if iscgo || GOOS == "solaris" || GOOS == "windows" {
32 mp.cgoCallers = new(cgoCallers)
33 }
34 }
复制代码
在这里就开始涉及到了m链表了,这个链表可以如下图表示:
再来看一下生成P的函数procesize:
procresize
这个函数可以改变p的数量,多退少补的原则,在初始化过程中,由于最开始是没有p的,所以开始的作用就是初始化设定数量的p。procresize不仅在初始化的时候会被调用,当用户手动调用runtime.GOMAXPROCS 的时候,会重新设定 nprocs,然后执行 startTheWorld(), startTheWorld()会是使用新的 nprocs 再次调用procresize 这个方法。
复制代码
1 func procresize(nprocs int32) *p {
2 old := gomaxprocs
3 if old < 0 || nprocs <= 0 {
4 throw("procresize: invalid arg")
5 }
6 // 更新统计
7 now := nanotime()
8 if sched.procresizetime != 0 {
9 sched.totaltime += int64(old) * (now - sched.procresizetime)
10 }
11 sched.procresizetime = now
12
13 // Grow allp if necessary.
14 // 如果新给的p的数量比原先的p的数量多,则新建增长的p
15 if nprocs > int32(len(allp)) {
16 // 与取录同步(可能正在运行)同时运行,因为它不在P上运行。
17 lock(&allpLock)
18 // 判断allp 的cap是否满足增长后的长度,满足就直接使用,不满足,则需要扩张这个slice
19 if nprocs <= int32(cap(allp)) {
20 allp = allp[:nprocs]
21 } else {
22 nallp := make([]*p, nprocs)
23 //复制所有内容至allp的上限,因此我们永远不会丢失旧分配的P。
24 copy(nallp, allp[:cap(allp)])
25 allp = nallp
26 }
27 unlock(&allpLock)
28 }
29
30 // initialize new P's
31 // 初始化新增的p
32 for i := int32(0); i < nprocs; i++ {
33 pp := allp[i]
34 if pp == nil {
35 pp = new(p)
36 pp.id = i
37 pp.status = _Pgcstop
38 pp.sudogcache = pp.sudogbuf[:0]
39 for i := range pp.deferpool {
40 pp.deferpool[i] = pp.deferpoolbuf[i][:0]
41 }
42 pp.wbBuf.reset()
43 // allp是一个slice,直接将新增的p放到对应的索引下面就ok了
44 atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
45 }
46 if pp.mcache == nil {
47 // 初始化时,old=0,第一个新建的p给当前的m使用
48 if old == 0 && i == 0 {
49 if getg().m.mcache == nil {
50 throw("missing mcache?")
51 }
52 pp.mcache = getg().m.mcache // bootstrap
53 } else {
54 // 为p分配内存
55 pp.mcache = allocmcache()
56 }
57 }
58 }
59
60 // free unused P's
61 // 释放掉多余的p,当新设置的p的数量,比原先设定的p的数量少的时候,会走到这个流程
62 // 通过 runtime.GOMAXPROCS 就可以动态的修改nprocs
63 for i := nprocs; i < old; i++ {
64 p := allp[i]
65 // move all runnable goroutines to the global queue
66 // 把当前p的运行队列里的g转移到全局的g的队列
67 for p.runqhead != p.runqtail {
68 // pop from tail of local queue
69 p.runqtail--
70 gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
71 // push onto head of global queue
72 globrunqputhead(gp)
73 }
74 // 把runnext里的g也转移到全局队列
75 if p.runnext != 0 {
76 globrunqputhead(p.runnext.ptr())
77 p.runnext = 0
78 }
79 // if there's a background worker, make it runnable and put
80 // it on the global queue so it can clean itself up
81 // 如果有gc worker的话,修改g的状态,然后再把它放到全局队列中
82 if gp := p.gcBgMarkWorker.ptr(); gp != nil {
83 casgstatus(gp, _Gwaiting, _Grunnable)
84 globrunqput(gp)
85 // This assignment doesn't race because the
86 // world is stopped.
87 p.gcBgMarkWorker.set(nil)
88 }
89 // sudoig的buf和cache,以及deferpool全部清空
90 for i := range p.sudogbuf {
91 p.sudogbuf[i] = nil
92 }
93 p.sudogcache = p.sudogbuf[:0]
94 for i := range p.deferpool {
95 for j := range p.deferpoolbuf[i] {
96 p.deferpoolbuf[i][j] = nil
97 }
98 p.deferpool[i] = p.deferpoolbuf[i][:0]
99 }
100 // 释放掉当前p的mcache
101 freemcache(p.mcache)
102 p.mcache = nil
103 // 把当前p的gfree转移到全局
104 gfpurge(p)
105 // 修改p的状态,让他自生自灭去了
106 p.status = _Pdead
107 // 无法释放P本身,因为它可以被syscall中的M引用
108 }
109
110 // Trim allp.
111 if int32(len(allp)) != nprocs {
112 lock(&allpLock)
113 allp = allp[:nprocs]
114 unlock(&allpLock)
115 }
116 // 判断当前g是否有p,有的话更改当前使用的p的状态,继续使用
117 _g_ := getg()
118 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
119 // continue to use the current P
120 _g_.m.p.ptr().status = _Prunning
121 } else {
122 // release the current P and acquire allp[0]
123 // 如果当前g有p,但是拥有的是已经释放的p,则不再使用这个p,重新分配
124 if _g_.m.p != 0 {
125 _g_.m.p.ptr().m = 0
126 }
127 // 分配allp[0]给当前g使用
128 _g_.m.p = 0
129 _g_.m.mcache = nil
130 p := allp[0]
131 p.m = 0
132 p.status = _Pidle
133 // 将p m g绑定,并把m.mcache指向p.mcache,并修改p的状态为_Prunning
134 acquirep(p)
135 }
136 var runnablePs *p
137 for i := nprocs - 1; i >= 0; i-- {
138 p := allp[i]
139 if _g_.m.p.ptr() == p {
140 continue
141 }
142 p.status = _Pidle
143 // 根据 runqempty 来判断当前p的g运行队列是否为空
144 if runqempty(p) {
145 // g运行队列为空的p,放到 sched的pidle队列里面
146 pidleput(p)
147 } else {
148 // g 运行队列不为空的p,组成一个可运行队列,并最后返回
149 p.m.set(mget())
150 p.link.set(runnablePs)
151 runnablePs = p
152 }
153 }
154 stealOrder.reset(uint32(nprocs))
155 var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
156 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
157 return runnablePs
158 }
复制代码
runqempty: 根据 p.runqtail == p.runqhead 和 p.runnext 来判断有没有待运行的g
pidleput: 将当前的p设置为 sched.pidle,然后根据p.link将空闲p串联起来,可参考上图allm的链表示意图
1.2 任务
只需要使用 go func 就可以创建一个goroutine,编译器会将go func 翻译成 newproc 进行调用,新建的任务是如何调用的呢,下面从创建开始进行源码跟踪
newproc
newproc 函数获取了参数和当前g的pc信息,并通过g0调用newproc1去真正的执行创建或获取可用的g:
复制代码
1 func newproc(siz int32, fn *funcval) {
2 // 获取第一参数地址
3 argp := add(unsafe.Pointer(&fn), sys.PtrSize)
4 // 获取当前执行的g
5 gp := getg()
6 // 获取当前g的pc
7 pc := getcallerpc()
8 systemstack(func() {
9 // 使用g0去执行newproc1函数
10 newproc1(fn, (*uint8)(argp), siz, gp, pc)
11 })
12 }
复制代码
newproc1
newporc1 的作用就是创建或者获取一个空的g,并初始化这个g,并尝试寻找一个p和m去执行g。
复制代码
1 func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
2 _g_ := getg()
3
4 if fn == nil {
5 _g_.m.throwing = -1 // 不要转储完整的堆栈
6 throw("go of nil func value")
7 }
8 // 加锁禁止被抢占
9 _g_.m.locks++ // 禁用抢占,因为它可以将p保留在本地变量中
10 siz := narg
11 siz = (siz + 7) &^ 7
12
13 // We could allocate a larger initial stack if necessary.
14 // Not worth it: this is almost always an error.
15 // 4*sizeof(uintreg): extra space added below
16 // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
17
18 // 如果参数过多,则直接抛出异常,栈大小是2k
19 if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
20 throw("newproc: function arguments too large for new goroutine")
21 }
22
23 _p_ := _g_.m.p.ptr()
24 // 尝试获取一个空闲的g,如果获取不到,则新建一个,并添加到allg里面
25 // gfget首先会尝试从p本地获取空闲的g,如果本地没有的话,则从全局获取一堆平衡到本地p
26 newg := gfget(_p_)
27 if newg == nil {
28 newg = malg(_StackMin)
29 casgstatus(newg, _Gidle, _Gdead)
30 // 新建的g,添加到全局的 allg里面,allg是一个slice, append进去即可
31 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
32 }
33 // 判断获取的g的栈是否正常
34 if newg.stack.hi == 0 {
35 throw("newproc1: newg missing stack")
36 }
37 // 判断g的状态是否正常
38 if readgstatus(newg) != _Gdead {
39 throw("newproc1: new g is not Gdead")
40 }
41 // 预留一点空间,防止读取超出一点点
42 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // 多余的空间,以防读取超出框架
43 // 空间大小进行对齐
44 totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
45 sp := newg.stack.hi - totalSize
46 spArg := sp
47 // usesLr 为0,这里不执行
48 if usesLR {
49 // caller's LR
50 *(*uintptr)(unsafe.Pointer(sp)) = 0
51 prepGoExitFrame(sp)
52 spArg += sys.MinFrameSize
53 }
54 if narg > 0 {
55 // 将参数拷贝入栈
56 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
57 // ... 省略 ...
58 }
59 // 初始化用于保存现场的区域及初始化基本状态
60 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
61 newg.sched.sp = sp
62 newg.stktopsp = sp
63 // 这里保存了goexit的地址,在用户函数执行完成后,会根据pc来执行goexit
64 newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
65 newg.sched.g = guintptr(unsafe.Pointer(newg))
66 // 这里调整 sched 信息,pc = goexit的地址
67 gostartcallfn(&newg.sched, fn)
68 newg.gopc = callerpc
69 newg.ancestors = saveAncestors(callergp)
70 newg.startpc = fn.fn
71 if _g_.m.curg != nil {
72 newg.labels = _g_.m.curg.labels
73 }
74 if isSystemGoroutine(newg) {
75 atomic.Xadd(&sched.ngsys, +1)
76 }
77 newg.gcscanvalid = false
78 casgstatus(newg, _Gdead, _Grunnable)
79 // 如果p缓存的goid已经用完,本地再从sched批量获取一点
80 if _p_.goidcache == _p_.goidcacheend {
81 / Sched.goidgen是最后分配的ID,此批次必须为[sched.goidgen + 1,sched.goidgen + GoidCacheBatch]。
82 //在启动时sched.goidgen = 0,因此主goroutine接收goid = 1。
83 _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
84 _p_.goidcache -= _GoidCacheBatch - 1
85 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
86 }
87 // 分配goid
88 newg.goid = int64(_p_.goidcache)
89 _p_.goidcache++
90 // 把新的g放到 p 的可运行g队列中
91 runqput(_p_, newg, true)
92 // 判断是否有空闲p,且是否需要唤醒一个m来执行g
93 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
94 wakep()
95 }
96 _g_.m.locks--
97 if _g_.m.locks == 0 && _g_.preempt { // 恢复抢占请求,以防我们在新堆栈中清除了它
98 _g_.stackguard0 = stackPreempt
99 }
100 }
复制代码
gfget
这个函数就是看一下p有没有空闲的g,没有则去全局的freeg队列查找,这里就涉及了p本地和全局平衡的一个交互了:
复制代码
1 func gfget(_p_ *p) *g {
2 retry:
3 gp := _p_.gfree
4 // 本地的g队列为空,且全局队列不为空,则从全局队列一次获取至多32个下来,如果全局队列不够就算了
5 if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
6 lock(&sched.gflock)
7 for _p_.gfreecnt < 32 {
8 if sched.gfreeStack != nil {
9 // 优先选择带堆栈的Gs。
10 gp = sched.gfreeStack
11 sched.gfreeStack = gp.schedlink.ptr()
12 } else if sched.gfreeNoStack != nil {
13 gp = sched.gfreeNoStack
14 sched.gfreeNoStack = gp.schedlink.ptr()
15 } else {
16 break
17 }
18 _p_.gfreecnt++
19 sched.ngfree--
20 gp.schedlink.set(_p_.gfree)
21 _p_.gfree = gp
22 }
23 // 已经从全局拿了g了,再去从头开始判断
24 unlock(&sched.gflock)
25 goto retry
26 }
27 // 如果拿到了g,则判断g是否有栈,没有栈就分配
28 // 栈的分配跟内存分配差不多,首先创建几个固定大小的栈的数组,然后到指定大小的数组里面去分配就ok了,过大则直接全局分配
29 if gp != nil {
30 _p_.gfree = gp.schedlink.ptr()
31 _p_.gfreecnt--
32 if gp.stack.lo == 0 {
33 // 堆栈已在gfput中释放,分配一个新的。
34 sys