Python3 与 C# 并发编程之~ 线程上篇

文章汇总:https://www.cnblogs.com/dotnetcrazy/p/9160514.html 目录: 2.1.入门篇¶ 2.1.1.线程案例¶ 2.1.2.指定线程名¶ 2.1.3.线程池案例¶ 2.1.4.其他扩展¶ 2.2.加强篇¶ 2.2.1.线程同步~互斥锁Lock¶ 2.2.2.线程同步~可重入锁RLock¶ 2.2.3.死锁引入¶ 2.2.4.线程同步~条件变量Condition¶ 2.2.5.线程同步~信号量Semaphore(互斥锁的高级版)¶ 锁专题扩展¶ 自行拓展¶ 扩展:线程安全¶ 2.线程篇 示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Thread 终于说道线程了,心酸啊,进程还有点东西下次接着聊,这周4天外出,所以注定发文少了+_+ 用过Java或者Net的重点都在线程这块,Python的重点其实在上篇,但线程自有其独到之处~比如资源共享(更轻量级) 这次采用循序渐进的方式讲解,先使用,再深入,然后扩展,最后来个案例,呃.呃.呃.先这样计划~欢迎纠正错误 2.1.入门篇 官方文档:https://docs.python.org/3/library/threading.html 进程是由若干线程组成的(一个进程至少有一个线程) 2.1.1.线程案例 用法和Process差不多,咱先看个案例:Thread(target=test, args=(i, )) import os from threading import Thread, current_thread def test(name): # current_thread()返回当前线程的实例 thread_name = current_thread().name # 获取线程名 print(f"[编号:{name}],ThreadName:{thread_name}\nPID:{os.getpid()},PPID:{os.getppid()}") def main(): t_list = [Thread(target=test, args=(i, )) for i in range(5)] for t in t_list: t.start() # 批量启动 for t in t_list: t.join() # 批量回收 # 主线程 print(f"[Main]ThreadName:{current_thread().name}\nPID:{os.getpid()},PPID:{os.getppid()}") if __name__ == '__main__': main() 输出:(同一个进程ID) [编号:0],ThreadName:Thread-1 PID:20533,PPID:19830 [编号:1],ThreadName:Thread-2 PID:20533,PPID:19830 [编号:2],ThreadName:Thread-3 PID:20533,PPID:19830 [编号:3],ThreadName:Thread-4 PID:20533,PPID:19830 [编号:4],ThreadName:Thread-5 PID:20533,PPID:19830 [Main]ThreadName:MainThread PID:22636,PPID:19830 注意一点:Python里面的线程是Posix Thread 2.1.2.指定线程名 如果想给线程设置一个Div的名字呢?: from threading import Thread, current_thread def test(): # current_thread()返回当前线程的实例 print(f"ThreadName:{current_thread().name}") def main(): t1 = Thread(target=test, name="小明") t2 = Thread(target=test) t1.start() t2.start() t1.join() t2.join() # 主线程 print(f"[Main],ThreadName:{current_thread().name}") if __name__ == '__main__': main() 输出:(你指定有特点的名字,没指定就使用默认命令【联想古时候奴隶名字都是编号,主人赐名就有名了】) ThreadName:小明 ThreadName:Thread-1 [Main],ThreadName:MainThread 类的方式创建线程 from threading import Thread class MyThread(Thread): def __init__(self, name): # 设个坑,你可以自行研究下 super().__init__() # 放在后面就报错了 self.name = name def run(self): print(self.name) def main(): t = MyThread(name="小明") t.start() t.join() if __name__ == '__main__': main() 输出:(和Thread初始化的name冲突了【变量名得注意哦】) 小明 2.1.3.线程池案例 from multiprocessing.dummy import Pool as ThreadPool, current_process def test(i): # 本质调用了:threading.current_thread print(f"[编号{i}]{current_process().name}") def main(): p = ThreadPool() for i in range(5): p.apply_async(test, args=(i, )) p.close() p.join() print(f"{current_process().name}") if __name__ == '__main__': main() 输出: [编号0]Thread-3 [编号1]Thread-4 [编号3]Thread-2 [编号2]Thread-1 [编号4]Thread-3 MainThread 微微扩展一下 对上面代码,项目里面一般都会这么优化:(并行这块线程后面会讲,不急) from multiprocessing.dummy import Pool as ThreadPool, current_process def test(i): # 源码:current_process = threading.current_thread print(f"[编号{i}]{current_process().name}") def main(): p = ThreadPool() p.map_async(test, list(range(5))) p.close() p.join() print(f"{current_process().name}") if __name__ == '__main__': main() 输出: [编号0]Thread-2 [编号1]Thread-4 [编号2]Thread-3 [编号4]Thread-2 [编号3]Thread-1 MainThread 代码改动很小(循环换成了map)性能提升很明显(密集型操作) 2.1.4.其他扩展 Thread初始化参数: daemon:是否为后台线程(主进程退出后台线程就退出了) Thread实例对象的方法: isAlive(): 返回线程是否活动的 getName(): 返回线程名 setName(): 设置线程名 isDaemon():是否为后台线程 setDaemon(True):设置后台线程 threading模块提供的一些方法: threading.currentThread(): 返回当前的线程实例 threading.enumerate(): 返回一个包含正在运行的线程List(线程启动后、结束前) threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果 看一个小案例: import time from threading import Thread, active_count def test1(): print("test1") time.sleep(1) print("test1 ok") def test2(): print("test2") time.sleep(2) print("test2 ok") def main(): t1 = Thread(target=test1) t2 = Thread(target=test2, daemon=True) t1.start() t2.start() t1.join() print(active_count()) print(t1.is_alive) print(t2.is_alive) # t2.join() # 除非加这一句才等daemon线程,不然直接不管了 if __name__ == '__main__': main() 下次就以multiprocessing.dummy模块为例了,API和threading几乎一样,进行了一些并发的封装,性价比更高 2.2.加强篇 其实以前的Linux中是没有线程这个概念的,Windows程序员经常使用线程,这一看~方便啊,然后可能是当时程序员偷懒了,就把进程模块改了改(这就是为什么之前说Linux下的多进程编程其实没有Win下那么“重量级”),弄了个精简版进程==>线程(内核是分不出进程和线程的,反正PCB个数都是一样) 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享(全局变量和堆 ==> 线程间共享。进程的栈 ==> 线程平分而独占) 还记得通过current_thread()获取的线程信息吗?难道线程也没个id啥的?一起看看:(通过ps -Lf pid 来查看LWP) 1.线程ID.png 回顾:进程共享的内容:(回顾:http://www.cnblogs.com/dotnetcrazy/p/9363810.html) 代码(.text) 文件描述符(fd) 内存映射(mmap) 2.2.1.线程同步~互斥锁Lock 线程之间共享数据的确方便,但是也容易出现数据混乱的现象,来看个例子: from multiprocessing.dummy import threading num = 0 # def global num def test(i): print(f"子进程:{i}") global num for i in range(100000): num += 1 def main(): p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) # 应该是500000,发生了数据混乱,结果少了很多 if __name__ == '__main__': main() 输出:(应该是500000,发生了数据混乱,只剩下358615) 子进程:0 子进程:1 子进程:2 子进程:3 子进程:4 452238 Lock案例 共享资源+CPU调度==>数据混乱==解决==>线程同步 这时候Lock就该上场了 互斥锁是实现线程同步最简单的一种方式,读写都加锁(读写都会串行) 先看看上面例子怎么解决调: from multiprocessing.dummy import threading, Lock num = 0 # def global num def test(i, lock): print(f"子进程:{i}") global num for i in range(100000): with lock: num += 1 def main(): lock = Lock() p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)] for i in p_list: i.start() for i in p_list: i.join() print(num) if __name__ == '__main__': main() 输出:time python3 1.thread.2.py 子进程:0 子进程:1 子进程:2 子进程:3 子进程:4 500000 real 0m2.846s user 0m1.897s sys 0m3.159s 优化下 lock设置为全局或者局部,性能几乎一样。循环换成map后性能有所提升(测试案例在Code中) from multiprocessing.dummy import Pool as ThreadPool, Lock num = 0 # def global num lock = Lock() def test(i): print(f"子进程:{i}") global num global lock for i in range(100000): with lock: num += 1 def main(): p = ThreadPool() p.map_async(test, list(range(5))) p.close() p.join() print(num) if __name__ == '__main__': main() 输出: time python3 1.thread.2.py 子进程:0 子进程:1 子进程:3 子进程:2 子进程:4 500000 real 0m2.468s user 0m1.667s sys 0m2.644s 本来多线程访问共享资源的时候可以并行,加锁后就部分串行了(没获取到的线程就阻塞等了) 【项目中可以多次加锁,每次加锁只对修改部分加(尽量少的代码) 】(以后会说协程和Actor模型) 补充:以前都是这么写的,现在支持with托管了(有时候还会用到,所以了解下):【net是直接lock大括号包起来】 #### 以前写法: lock.acquire() # 获取锁 try: num += 1 finally: lock.release() # 释放锁 #### 等价简写 with lock: num += 1 扩展知识:(GIL在扩展篇会详说) GIL的作用:多线程情况下必须存在资源的竞争,GIL是为了保证在解释器级别的线程唯一使用共享资源(cpu)。 同步锁的作用:为了保证解释器级别下的自己编写的程序唯一使用共享资源产生了同步锁 2.2.2.线程同步~可重入锁RLock 看个场景:小明欠小张2000,欠小周5000,现在需要同时转账给他们:(规定:几次转账加几次锁) 2.RLock.png 小明啥也没管,直接撸起袖子就写Code了:(错误Code示意) from multiprocessing.dummy import Pool as ThreadPool, Lock xiaoming = 8000 xiaozhang = 3000 xiaozhou = 5000 def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明转账2000给小张 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明转账5000给小周 xiaoming -= 5000 xiaozhou += 5000 def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") if __name__ == '__main__': main() 小明写完代码就出去了,这可把小周和小张等急了,打了N个电话来催,小明心想啥情况? 一看代码楞住了,改了改代码,轻轻松松把钱转出去了: from multiprocessing.dummy import Pool as ThreadPool, Lock xiaoming = 8000 xiaozhang = 3000 xiaozhou = 5000 # 小明转账2000给小张 def a_to_b(lock): global xiaoming global xiaozhang with lock: xiaoming -= 2000 xiaozhang += 2000 # 小明转账5000给小周 def a_to_c(lock): global xiaoming global xiaozhou with lock: xiaoming -= 5000 xiaozhou += 5000 def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = Lock() p = ThreadPool() p.apply_async(a_to_b, args=(lock, )) p.apply_async(a_to_c, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") if __name__ == '__main__': main() 输出: [还钱前]小明8000,小张3000,小周5000 [还钱后]小明1000,小张5000,小周10000 就这么算了吗?不不不,不符合小明性格,于是小明研究了下,发现~还有个递归锁RLock呢,正好解决他的问题: from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把这边换了下 xiaoming = 8000 xiaozhang = 3000 xiaozhou = 5000 def test(lock): global xiaoming global xiaozhang global xiaozhou # 小明想一次搞定: with lock: # 小明转账2000给小张 xiaoming -= 2000 xiaozhang += 2000 with lock: # 小明转账5000给小周 xiaoming -= 5000 xiaozhou += 5000 def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") lock = RLock() # 就把这边换了下 p = ThreadPool() p.apply_async(test, args=(lock, )) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}") if __name__ == '__main__': main() RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源 2.2.3.死锁引入 1.多次获取导致死锁 小明想到了之前说的(互斥锁Lock读写都加锁)就把代码拆分研究了下: print("[开始]小明转账2000给小张") lock.acquire() # 获取锁 xiaoming -= 2000 xiaozhang += 2000 print("[开始]小明转账5000给小周") lock.acquire() # 获取锁(互斥锁第二次加锁) xiaoming -= 5000 xiaozhou += 5000 lock.release() # 释放锁 print("[结束]小明转账5000给小周") lock.release() # 释放锁 print("[开始]小明转账2000给小张") 输出发现:(第二次加锁的时候,变成阻塞等了【死锁】) [还钱前]小明8000,小张3000,小周5000 [开始]小明转账2000给小张 [开始]小明转账5000给小周 这种方式,Python提供的RLock就可以解决了 2.常见的死锁 看个场景:小明和小张需要流水帐,经常互刷~小明给小张转账1000,小张给小明转账1000 一般来说,有几个共享资源就加几把锁(小张、小明就是两个共享资源,所以需要两把Lock) 先描述下然后再看代码: 正常流程 小明给小张转1000:小明自己先加个锁==>小明-1000==>获取小张的锁==>小张+1000==>转账完毕 死锁情况 小明给小张转1000:小明自己先加个锁==>小明-1000==>准备获取小张的锁。可是这时候小张准备转账给小明,已经把自己的锁获取了,在等小明的锁(两个人相互等,于是就一直死锁了) 代码模拟一下过程: from time import sleep from multiprocessing.dummy import Pool as ThreadPool, Lock xiaoming = 5000 xiaozhang = 8000 m_lock = Lock() # 小明的锁 z_lock = Lock() # 小张的锁 # 小明转账1000给小张 def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock with m_lock: xiaoming -= 1000 sleep(0.01) with z_lock: xiaozhang += 1000 # 小张转账1000给小明 def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000 def main(): print(f"[还钱前]小明{xiaoming},小张{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[还钱后]小明{xiaoming},小张{xiaozhang}") if __name__ == '__main__': main() 输出:(卡在这边了) [转账前]小明5000,小张8000 项目中像这类的情况,一般都是这几种解决方法:(还有其他解决方案,后面会继续说) 按指定顺序去访问共享资源 trylock的重试机制(Lock(False)) 在访问其他锁的时候,先把自己锁解了 得不到全部锁就先放弃已经获取的资源 比如上面的情况,我们如果规定,不管是谁先转账,先从小明开始,然后再小张,那么就没问题了。或者谁钱多就谁(权重高的优先) from time import sleep from multiprocessing.dummy import Pool as ThreadPool, Lock xiaoming = 5000 xiaozhang = 8000 m_lock = Lock() # 小明的锁 z_lock = Lock() # 小张的锁 # 小明转账1000给小张 def a_to_b(): global xiaoming global xiaozhang global m_lock global z_lock # 以上次代码为例,这边只修改了这块 with z_lock: # 小张权重高,大家都先获取小张的锁 xiaozhang += 1000 sleep(0.01) with m_lock: xiaoming -= 1000 # 小张转账1000给小明 def b_to_a(): global xiaoming global xiaozhang global m_lock global z_lock with z_lock: xiaozhang -= 1000 sleep(0.01) with m_lock: xiaoming += 1000 def main(): print(f"[转账前]小明{xiaoming},小张{xiaozhang}") p = ThreadPool() p.apply_async(a_to_b) p.apply_async(b_to_a) p.close() p.join() print(f"[转账后]小明{xiaoming},小张{xiaozhang}") if __name__ == '__main__': main() 输出: [转账前]小明5000,小张8000 [转账后]小明5000,小张8000 2.2.4.线程同步~条件变量Condition 条件变量一般都不是锁,只能能阻塞线程,从而减少不必要的竞争,Python内置了RLock(不指定就是RLock) 看看源码: class Condition: """ 实现条件变量的类。 条件变量允许一个或多个线程等到另一个线程通知它们为止 如果给出了lock参数而不是None,那必须是Lock或RLock对象作底层锁。 否则,一个新的RLock对象被创建并用作底层锁。 """ def __init__(self, lock=None): if lock is None: lock = RLock() self._lock = lock # 设置lock的acquire()和release()方法 self.acquire = lock.acquire self.release = lock.release 再看看可不可以进行with托管:(支持) def __enter__(self): return self._lock.__enter__() def __exit__(self, *args): return self._lock.__exit__(*args) 看个生产消费者的简单例子:(生产完就通知消费者) from multiprocessing.dummy import Pool as ThreadPool, Condition s_list = [] con = Condition() def Shop(i): global con global s_list # 加锁保护共享资源 for x in range(5): with con: s_list.append(x) print(f"[生产者{i}]生产商品{x}") con.notify_all() # 通知消费者有货了 def User(i): global con global s_list while True: with con: if s_list: print(f"列表商品:{s_list}") name =
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信