1.1 初始化   go程序的启动流程分为四步 call osinit, 这里就是设置了全局变量ncpu = cpu核心数量 call schedinit make & queue new G (runtime.newproc, go func()也是调用这个函数来创建goroutine) call runtime·mstart   其中,schedinit 就是调度器的初始化,除了schedinit 中对内存分配,垃圾回收等操作,针对调度器的初始化大致就是初始化自身,设置最大的maxmcount, 确定p的数量并初始化这些操作。 schedinit   schedinit这里对当前m进行了初始化,并根据osinit获取到的CPU核数和设置的GOMAXPROCS确定P的数量,并进行初始化。 复制代码 1 func schedinit() { 2 // 从TLS或者专用寄存器获取当前g的指针类型 3 _g_ := getg() 4 // 设置m最大的数量 5 sched.maxmcount = 10000 6 7 // 初始化栈的复用空间 8 stackinit() 9 // 初始化当前m 10 mcommoninit(_g_.m) 11 12 // osinit的时候会设置 ncpu这个全局变量,这里就是根据cpu核心数和参数GOMAXPROCS来确定p的数量 13 procs := ncpu 14 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { 15 procs = n 16 } 17 // 生成设定数量的p 18 if procresize(procs) != nil { 19 throw("unknown runnable goroutine during bootstrap") 20 } 21 } 复制代码   初始化当前M时调用了 mcommoninit() 函数,再看下这个函数的实现 mcommoninit 复制代码 1 func mcommoninit(mp *m) { 2 _g_ := getg() 3 4 lock(&sched.lock) 5 // 判断mnext的值是否溢出,mnext需要赋值给m.id 6 if sched.mnext+1 < sched.mnext { 7 throw("runtime: thread ID overflow") 8 } 9 mp.id = sched.mnext 10 sched.mnext++ 11 // 判断m的数量是否比maxmcount设定的要多,如果超出直接报异常 12 checkmcount() 13 // 创建一个新的g用于处理signal,并分配栈 14 mpreinit(mp) 15 if mp.gsignal != nil { 16 mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard 17 } 18 19 //添加到allm,以便垃圾收集器不会释放g-> m 20 //仅在寄存器或线程本地存储中时。 21 22 // 接下来的两行,首先将当前m放到allm的头,然后原子操作,将当前m的地址,赋值给m,这样就将当前m添加到了allm链表的头了 23 mp.alllink = allm 24 25 // NumCgoCall()遍历不带schedlock的allm, 26 //,因此我们需要安全地发布它。 27 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) 28 unlock(&sched.lock) 29 30 //如果cgo调用崩溃,则分配内存以保留cgo追溯。 31 if iscgo || GOOS == "solaris" || GOOS == "windows" { 32 mp.cgoCallers = new(cgoCallers) 33 } 34 } 复制代码   在这里就开始涉及到了m链表了,这个链表可以如下图表示:   再来看一下生成P的函数procesize: procresize   这个函数可以改变p的数量,多退少补的原则,在初始化过程中,由于最开始是没有p的,所以开始的作用就是初始化设定数量的p。procresize不仅在初始化的时候会被调用,当用户手动调用runtime.GOMAXPROCS 的时候,会重新设定 nprocs,然后执行 startTheWorld(), startTheWorld()会是使用新的 nprocs 再次调用procresize 这个方法。 复制代码 1 func procresize(nprocs int32) *p { 2 old := gomaxprocs 3 if old < 0 || nprocs <= 0 { 4 throw("procresize: invalid arg") 5 } 6 // 更新统计 7 now := nanotime() 8 if sched.procresizetime != 0 { 9 sched.totaltime += int64(old) * (now - sched.procresizetime) 10 } 11 sched.procresizetime = now 12 13 // Grow allp if necessary. 14 // 如果新给的p的数量比原先的p的数量多,则新建增长的p 15 if nprocs > int32(len(allp)) { 16 // 与取录同步(可能正在运行)同时运行,因为它不在P上运行。 17 lock(&allpLock) 18 // 判断allp 的cap是否满足增长后的长度,满足就直接使用,不满足,则需要扩张这个slice 19 if nprocs <= int32(cap(allp)) { 20 allp = allp[:nprocs] 21 } else { 22 nallp := make([]*p, nprocs) 23 //复制所有内容至allp的上限,因此我们永远不会丢失旧分配的P。 24 copy(nallp, allp[:cap(allp)]) 25 allp = nallp 26 } 27 unlock(&allpLock) 28 } 29 30 // initialize new P's 31 // 初始化新增的p 32 for i := int32(0); i < nprocs; i++ { 33 pp := allp[i] 34 if pp == nil { 35 pp = new(p) 36 pp.id = i 37 pp.status = _Pgcstop 38 pp.sudogcache = pp.sudogbuf[:0] 39 for i := range pp.deferpool { 40 pp.deferpool[i] = pp.deferpoolbuf[i][:0] 41 } 42 pp.wbBuf.reset() 43 // allp是一个slice,直接将新增的p放到对应的索引下面就ok了 44 atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) 45 } 46 if pp.mcache == nil { 47 // 初始化时,old=0,第一个新建的p给当前的m使用 48 if old == 0 && i == 0 { 49 if getg().m.mcache == nil { 50 throw("missing mcache?") 51 } 52 pp.mcache = getg().m.mcache // bootstrap 53 } else { 54 // 为p分配内存 55 pp.mcache = allocmcache() 56 } 57 } 58 } 59 60 // free unused P's 61 // 释放掉多余的p,当新设置的p的数量,比原先设定的p的数量少的时候,会走到这个流程 62 // 通过 runtime.GOMAXPROCS 就可以动态的修改nprocs 63 for i := nprocs; i < old; i++ { 64 p := allp[i] 65 // move all runnable goroutines to the global queue 66 // 把当前p的运行队列里的g转移到全局的g的队列 67 for p.runqhead != p.runqtail { 68 // pop from tail of local queue 69 p.runqtail-- 70 gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() 71 // push onto head of global queue 72 globrunqputhead(gp) 73 } 74 // 把runnext里的g也转移到全局队列 75 if p.runnext != 0 { 76 globrunqputhead(p.runnext.ptr()) 77 p.runnext = 0 78 } 79 // if there's a background worker, make it runnable and put 80 // it on the global queue so it can clean itself up 81 // 如果有gc worker的话,修改g的状态,然后再把它放到全局队列中 82 if gp := p.gcBgMarkWorker.ptr(); gp != nil { 83 casgstatus(gp, _Gwaiting, _Grunnable) 84 globrunqput(gp) 85 // This assignment doesn't race because the 86 // world is stopped. 87 p.gcBgMarkWorker.set(nil) 88 } 89 // sudoig的buf和cache,以及deferpool全部清空 90 for i := range p.sudogbuf { 91 p.sudogbuf[i] = nil 92 } 93 p.sudogcache = p.sudogbuf[:0] 94 for i := range p.deferpool { 95 for j := range p.deferpoolbuf[i] { 96 p.deferpoolbuf[i][j] = nil 97 } 98 p.deferpool[i] = p.deferpoolbuf[i][:0] 99 } 100 // 释放掉当前p的mcache 101 freemcache(p.mcache) 102 p.mcache = nil 103 // 把当前p的gfree转移到全局 104 gfpurge(p) 105 // 修改p的状态,让他自生自灭去了 106 p.status = _Pdead 107 // 无法释放P本身,因为它可以被syscall中的M引用 108 } 109 110 // Trim allp. 111 if int32(len(allp)) != nprocs { 112 lock(&allpLock) 113 allp = allp[:nprocs] 114 unlock(&allpLock) 115 } 116 // 判断当前g是否有p,有的话更改当前使用的p的状态,继续使用 117 _g_ := getg() 118 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { 119 // continue to use the current P 120 _g_.m.p.ptr().status = _Prunning 121 } else { 122 // release the current P and acquire allp[0] 123 // 如果当前g有p,但是拥有的是已经释放的p,则不再使用这个p,重新分配 124 if _g_.m.p != 0 { 125 _g_.m.p.ptr().m = 0 126 } 127 // 分配allp[0]给当前g使用 128 _g_.m.p = 0 129 _g_.m.mcache = nil 130 p := allp[0] 131 p.m = 0 132 p.status = _Pidle 133 // 将p m g绑定,并把m.mcache指向p.mcache,并修改p的状态为_Prunning 134 acquirep(p) 135 } 136 var runnablePs *p 137 for i := nprocs - 1; i >= 0; i-- { 138 p := allp[i] 139 if _g_.m.p.ptr() == p { 140 continue 141 } 142 p.status = _Pidle 143 // 根据 runqempty 来判断当前p的g运行队列是否为空 144 if runqempty(p) { 145 // g运行队列为空的p,放到 sched的pidle队列里面 146 pidleput(p) 147 } else { 148 // g 运行队列不为空的p,组成一个可运行队列,并最后返回 149 p.m.set(mget()) 150 p.link.set(runnablePs) 151 runnablePs = p 152 } 153 } 154 stealOrder.reset(uint32(nprocs)) 155 var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 156 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) 157 return runnablePs 158 } 复制代码 runqempty: 根据 p.runqtail == p.runqhead 和 p.runnext 来判断有没有待运行的g pidleput: 将当前的p设置为 sched.pidle,然后根据p.link将空闲p串联起来,可参考上图allm的链表示意图 1.2 任务   只需要使用 go func 就可以创建一个goroutine,编译器会将go func 翻译成 newproc 进行调用,新建的任务是如何调用的呢,下面从创建开始进行源码跟踪 newproc   newproc 函数获取了参数和当前g的pc信息,并通过g0调用newproc1去真正的执行创建或获取可用的g: 复制代码 1 func newproc(siz int32, fn *funcval) { 2 // 获取第一参数地址 3 argp := add(unsafe.Pointer(&fn), sys.PtrSize) 4 // 获取当前执行的g 5 gp := getg() 6 // 获取当前g的pc 7 pc := getcallerpc() 8 systemstack(func() { 9 // 使用g0去执行newproc1函数 10 newproc1(fn, (*uint8)(argp), siz, gp, pc) 11 }) 12 } 复制代码 newproc1   newporc1 的作用就是创建或者获取一个空的g,并初始化这个g,并尝试寻找一个p和m去执行g。 复制代码 1 func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) { 2 _g_ := getg() 3 4 if fn == nil { 5 _g_.m.throwing = -1 // 不要转储完整的堆栈 6 throw("go of nil func value") 7 } 8 // 加锁禁止被抢占 9 _g_.m.locks++ // 禁用抢占,因为它可以将p保留在本地变量中 10 siz := narg 11 siz = (siz + 7) &^ 7 12 13 // We could allocate a larger initial stack if necessary. 14 // Not worth it: this is almost always an error. 15 // 4*sizeof(uintreg): extra space added below 16 // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall). 17 18 // 如果参数过多,则直接抛出异常,栈大小是2k 19 if siz >= _StackMin-4*sys.RegSize-sys.RegSize { 20 throw("newproc: function arguments too large for new goroutine") 21 } 22 23 _p_ := _g_.m.p.ptr() 24 // 尝试获取一个空闲的g,如果获取不到,则新建一个,并添加到allg里面 25 // gfget首先会尝试从p本地获取空闲的g,如果本地没有的话,则从全局获取一堆平衡到本地p 26 newg := gfget(_p_) 27 if newg == nil { 28 newg = malg(_StackMin) 29 casgstatus(newg, _Gidle, _Gdead) 30 // 新建的g,添加到全局的 allg里面,allg是一个slice, append进去即可 31 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. 32 } 33 // 判断获取的g的栈是否正常 34 if newg.stack.hi == 0 { 35 throw("newproc1: newg missing stack") 36 } 37 // 判断g的状态是否正常 38 if readgstatus(newg) != _Gdead { 39 throw("newproc1: new g is not Gdead") 40 } 41 // 预留一点空间,防止读取超出一点点 42 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // 多余的空间,以防读取超出框架 43 // 空间大小进行对齐 44 totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign 45 sp := newg.stack.hi - totalSize 46 spArg := sp 47 // usesLr 为0,这里不执行 48 if usesLR { 49 // caller's LR 50 *(*uintptr)(unsafe.Pointer(sp)) = 0 51 prepGoExitFrame(sp) 52 spArg += sys.MinFrameSize 53 } 54 if narg > 0 { 55 // 将参数拷贝入栈 56 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) 57 // ... 省略 ... 58 } 59 // 初始化用于保存现场的区域及初始化基本状态 60 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) 61 newg.sched.sp = sp 62 newg.stktopsp = sp 63 // 这里保存了goexit的地址,在用户函数执行完成后,会根据pc来执行goexit 64 newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function 65 newg.sched.g = guintptr(unsafe.Pointer(newg)) 66 // 这里调整 sched 信息,pc = goexit的地址 67 gostartcallfn(&newg.sched, fn) 68 newg.gopc = callerpc 69 newg.ancestors = saveAncestors(callergp) 70 newg.startpc = fn.fn 71 if _g_.m.curg != nil { 72 newg.labels = _g_.m.curg.labels 73 } 74 if isSystemGoroutine(newg) { 75 atomic.Xadd(&sched.ngsys, +1) 76 } 77 newg.gcscanvalid = false 78 casgstatus(newg, _Gdead, _Grunnable) 79 // 如果p缓存的goid已经用完,本地再从sched批量获取一点 80 if _p_.goidcache == _p_.goidcacheend { 81 / Sched.goidgen是最后分配的ID,此批次必须为[sched.goidgen + 1,sched.goidgen + GoidCacheBatch]。 82 //在启动时sched.goidgen = 0,因此主goroutine接收goid = 1。 83 _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) 84 _p_.goidcache -= _GoidCacheBatch - 1 85 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch 86 } 87 // 分配goid 88 newg.goid = int64(_p_.goidcache) 89 _p_.goidcache++ 90 // 把新的g放到 p 的可运行g队列中 91 runqput(_p_, newg, true) 92 // 判断是否有空闲p,且是否需要唤醒一个m来执行g 93 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { 94 wakep() 95 } 96 _g_.m.locks-- 97 if _g_.m.locks == 0 && _g_.preempt { // 恢复抢占请求,以防我们在新堆栈中清除了它 98 _g_.stackguard0 = stackPreempt 99 } 100 } 复制代码 gfget   这个函数就是看一下p有没有空闲的g,没有则去全局的freeg队列查找,这里就涉及了p本地和全局平衡的一个交互了: 复制代码 1 func gfget(_p_ *p) *g { 2 retry: 3 gp := _p_.gfree 4 // 本地的g队列为空,且全局队列不为空,则从全局队列一次获取至多32个下来,如果全局队列不够就算了 5 if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) { 6 lock(&sched.gflock) 7 for _p_.gfreecnt < 32 { 8 if sched.gfreeStack != nil { 9 // 优先选择带堆栈的Gs。 10 gp = sched.gfreeStack 11 sched.gfreeStack = gp.schedlink.ptr() 12 } else if sched.gfreeNoStack != nil { 13 gp = sched.gfreeNoStack 14 sched.gfreeNoStack = gp.schedlink.ptr() 15 } else { 16 break 17 } 18 _p_.gfreecnt++ 19 sched.ngfree-- 20 gp.schedlink.set(_p_.gfree) 21 _p_.gfree = gp 22 } 23 // 已经从全局拿了g了,再去从头开始判断 24 unlock(&sched.gflock) 25 goto retry 26 } 27 // 如果拿到了g,则判断g是否有栈,没有栈就分配 28 // 栈的分配跟内存分配差不多,首先创建几个固定大小的栈的数组,然后到指定大小的数组里面去分配就ok了,过大则直接全局分配 29 if gp != nil { 30 _p_.gfree = gp.schedlink.ptr() 31 _p_.gfreecnt-- 32 if gp.stack.lo == 0 { 33 // 堆栈已在gfput中释放,分配一个新的。 34 sys