Python并发编程系列之多线程

1引言 2 创建线程   2.1 函数的方式创建线程   2.2 类的方式创建线程 3 Thread类的常用属性和方法   3.1 守护线程:Deamon   3.2 join()方法 4 线程间的同步机制   4.1 互斥锁:Lock   4.2 递归锁:RLock   4.3 Condition   4.4 信号量:Semaphore   4.5 事件:Event   4.6 定时器:Timer 5 线程间的通行   5.1队列:Queue 6 线程池 7 总结 1 引言   上一篇博文详细总结了Python进程的用法,这一篇博文来所以说Python中线程的用法。实际上,程序的运行都是以线程为基本单位的,每一个进程中都至少有一个线程(主线程),线程又可以创建子线程。线程间共享数据比进程要容易得多(轻而易举),进程间的切换也要比进程消耗CPU资源少。   线程管理可以通过thead模块(Python中已弃用)和threading 模块,但目前主要以threading模块为主。因为更加先进,有更好的线程支持,且 threading模块的同步原语远多于thread模块。另外,thread 模块中的一些属性会和 threading 模块有冲突。故,本文创建线程和使用线程都通过threading模块进行。   threading模块提供的类: Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。   threading 模块提供的常用方法:   threading.currentThread(): 返回当前的线程变量。   threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。   threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。   threading 模块提供的常量:   threading.TIMEOUT_MAX 设置threading全局超时时间。 2 创建线程   无论是用自定义函数的方法创建线程还是用自定义类的方法创建线程与创建进程的方法极其相似的,不过,创建线程时,可以不在“if __name__==”__main__:”语句下进行”。无论是哪种方式,都必须通过threading模块提供的Thread类进行。Thread类常用属性和方法如下。   Thread类属性:   name:线程名   ident:线程的标识符   daemon:布尔值,表示这个线程是否是守护线程   Thread类方法:   __init__(group=None,target=None,name=None,args=(),kwargs={},verbose=None,daemon=None):实例化一个线程对象,需要一个可调用的target对象,以及参数args或者kwargs。还可以传递name和group参数。daemon的值将会设定thread.daemon的属性   start():开始执行该线程   run():定义线程的方法。(通常开发者应该在子类中重写)   join(timeout=None):直至启动的线程终止之前一直挂起;除非给出了timeout(单位秒),否则一直被阻塞   isAlive:布尔值,表示这个线程是否还存活(驼峰式命名,python2.6版本开始已被取代)   isDaemon():布尔值,表示是否是守护线程   setDaemon(布尔值):在线程start()之前调用,把线程的守护标识设定为指定的布尔值   在下面两小节我们分别通过代码来演示。 2.1 自定义函数的方式创建线程 复制代码 import os import time import threading def fun(n): print('子线程开始运行……') time.sleep(1) my_thread_name = threading.current_thread().name#获取当前线程名称 my_thread_id = threading.current_thread().ident#获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{},您输入的参数为:{}'.format(my_thread_name ,my_thread_id , os.getpid(),n)) print('子线程运行结束……') t = threading.Thread(target=fun , name='线程1',args=('参数1',)) t.start() time.sleep(2) main_thread_name = threading.current_thread().name#获取当前线程名称 main_thread_id = threading.current_thread().ident#获取当前线程id print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name ,main_thread_id , os.getpid())) 复制代码 2.2 类的方式创建线程 复制代码 import os import time import threading class MyThread(threading.Thread): def __init__(self , n , name=None): super().__init__() self.name = name self.n = n def run(self): print('子线程开始运行……') time.sleep(1) my_thread_name = threading.current_thread().name#获取当前线程名称 my_thread_id = threading.current_thread().ident#获取当前线程id print('当前线程为:{},线程id为:{},所在进程为:{},您输入的参数为:{}'.format(my_thread_name ,my_thread_id , os.getpid(),self.n)) print('子线程运行结束……') t = MyThread(name='线程1', n=1) t.start() time.sleep(2) main_thread_name = threading.current_thread().name#获取当前线程名称 main_thread_id = threading.current_thread().ident#获取当前线程id print('主线程为:{},线程id为:{},所在进程为:{}'.format(main_thread_name ,main_thread_id , os.getpid())) 复制代码   输出结果:   子线程开始运行……   当前线程为:线程1,线程id为:11312,所在进程为:4532,您输入的参数为:1   子线程运行结束……   主线程为:MainThread,线程id为:10868,所在进程为:4532   上述两块代码输出结果是一样的(id不一样),观察输出结果可以发现,子线程和主线程所在的进程都是一样的,证明是在同一进程中的进程。 3 Thread的常用方法和属性 3.1 守护线程:Deamon   Thread类有一个名为deamon的属性,标志该线程是否为守护线程,默认值为False,当为设为True是表示设置为守护线程。是否是守护线程有什么区别呢?我们先来看看deamon值为False(默认)情况时: 复制代码 import os import time import threading def fun(): print('子线程开始运行……') for i in range(6):#运行3秒,每秒输出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name ,i+1)) print('子线程运行结束……') print('主线程开始运行……') t = threading.Thread(target=fun , name='线程1') print('daemon的值为:{}'.format(t.daemon)) t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name, i+1)) print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   daemon的值为:False   子线程开始运行……   MainThread已运行1秒……   线程1已运行1秒……   MainThread已运行2秒……   线程1已运行2秒……   MainThread已运行3秒……   主线程结束运行……   线程1已运行3秒……   线程1已运行4秒……   线程1已运行5秒……   线程1已运行6秒……   子线程运行结束……   代码中,主线程只需要运行3秒即可结束,但子线程需要运行6秒,从运行结果中可以看到,主线程代码运行结束后,子线程还可以继续运行,这就是非守护线程的特征。 再来看看daemon值为True时: 复制代码 import time import threading def fun(): print('子线程开始运行……') for i in range(6):#运行3秒,每秒输出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name ,i+1)) print('子线程运行结束……') print('主线程开始运行……') t = threading.Thread(target=fun , name='线程1') t.daemon=True #设置为守护线程 print('daemon的值为:{}'.format(t.daemon)) t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name, i+1)) print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   daemon的值为:True   子线程开始运行……   MainThread已运行1秒……   线程1已运行1秒……   MainThread已运行2秒……   线程1已运行2秒……   MainThread已运行3秒……   主线程结束运行……   从运行结果中可以看出,当deamon值为True,即设为守护线程后,只要主线程结束了,无论子线程代码是否结束,都得跟着结束,这就是守护线程的特征。另外,修改deamon的值必须在线程start()方法调用之前,否则会报错。 3.2 join()方法   join()方法的作用是在调用join()方法处,让所在线程(主线程)同步的等待被join的线程(下面的p线程),只有p线程结束。我们尝试在不同的位置调用join方法,对比运行结果。首先在p线程一开始的位置进行join: 复制代码 import time import threading def fun(): print('子线程开始运行……') for i in range(6):#运行3秒,每秒输出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name ,i+1)) print('子线程运行结束……') print('主线程开始运行……') t = threading.Thread(target=fun , name='线程1') t.daemon=True #设置为守护线程 t.start() t.join() #此处进行join for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name, i+1)) print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   子线程开始运行……   线程1已运行1秒……   线程1已运行2秒……   线程1已运行3秒……   线程1已运行4秒……   线程1已运行5秒……   线程1已运行6秒……   子线程运行结束……   MainThread已运行1秒……   MainThread已运行2秒……   MainThread已运行3秒……   主线程结束运行……   可以看出,等子线程运行完之后,主线程才继续join下面的代码。然后在主线程即将结束时进行join: 复制代码 import time import threading def fun(): print('子线程开始运行……') for i in range(6):#运行3秒,每秒输出一次 time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name ,i+1)) print('子线程运行结束……') print('主线程开始运行……') t = threading.Thread(target=fun , name='线程1') t.daemon=True #设置为守护线程 t.start() for i in range(3): time.sleep(1) my_thread_name = threading.current_thread().name print('{}已运行{}秒……'.format(my_thread_name, i+1)) t.join() print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   子线程开始运行……   MainThread已运行1秒……   线程1已运行1秒……   MainThread已运行2秒……   线程1已运行2秒……   MainThread已运行3秒……   线程1已运行3秒……   线程1已运行4秒……   线程1已运行5秒……   线程1已运行6秒……   子线程运行结束……   主线程结束运行……   上面代码中,子线程是设置为守护线程的,如果没有调用join()方法主线程3秒结束,子线程也会跟着结束,但是从运行结果中我们可以看出,主线程3秒后,陷入等待,等子线程运行完之后,才会继续下面的代码。 4 线程间的同步机制   在默认情况在,多个线程之间是并发执行的,这就可能给数据代码不安全性,例如有一个全局变量num=10,线程1、线程2每次读取该变量后在原有值基础上减1。但,如果线程1读取num的值(num=10)后,还没来得及减1,CPU就切换去执行线程2,线程2也去读取num,这时候读取到的值也还是num=10,然后让num=9,这是CPU有切换回线程1,因为线程1读取到的值是原来的num=10,所以做减1运算后,也做出num=9的结果。两个线程都执行了该任务,但最后的值可不是8。如下代码所示: 复制代码 import time import threading def fun(): global num temp = num time.sleep(0.2) temp -= 1 num = temp print('主线程开始运行……') t_lst = [] num =10 # 全局变量 for i in range(10): t = threading.Thread(target=fun) t_lst.append(t) t.start() [t.join() for t in t_lst] print('num最后的值为:{}'.format(num)) print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   num最后的值为:9   主线程结束运行……   最后结果为9,不是0。这就造成了数据混乱。所以,就有了线程同步机制。 4.1 互斥锁:Lock   线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源设置一个状态:锁定和非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。 复制代码 import time import threading def fun(lock): lock.acquire() global num temp = num time.sleep(0.2) temp -= 1 num = temp lock.release() print('主线程开始运行……') t_lst = [] num =10 # 全局变量 lock = threading.Lock() for i in range(10): t = threading.Thread(target=fun , args=(lock,)) t_lst.append(t) t.start() [t.join() for t in t_lst] print('num最后的值为:{}'.format(num)) print('主线程结束运行……') 复制代码   输出结果:   主线程开始运行……   num最后的值为:0   主线程结束运行……   可以看到,最后输出结果为0,值正确。当然,如果你运行了上述两块代码,你就会发现,使用了锁之后,代码运行速度明显降低,这是因为线程由原来的并发执行变成了串行,不过数据安全性得到保证。   使用Lock的时候必须注意是否会陷入死锁,所谓死锁是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。关于死锁一个著名的模型是“科学家吃面”模型: 复制代码 import time from threading import Thread from threading import Lock def eatNoodles_1(noodle_lock, fork_lock, scientist): noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) time.sleep(1) print('{} 吃到了面'.format(scientist)) fork_lock.release() noodle_lock.release() print('{} 放下了面'.format(scientist)) print('{} 放下了叉子'.format(scientist)) def eatNoodles_2(noodle_lock, fork_lock, scientist): fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) print('{} 吃到了面'.format(scientist)) noodle_lock.release() print('{} 放下了面'.format(scientist)) fork_lock.release() print('{} 放下了叉子'.format(scientist)) scientist_list1 = ['霍金','居里夫人'] scientist_list2 = ['爱因斯坦','富兰克林'] noodle_lock = Lock() fork_lock = Lock() for i in scientist_list1: t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i)) t.start() for i in scientist_list2: t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i)) t.start() 复制代码   输出结果:   霍金 拿到了面   霍金 拿到了叉子   霍金 吃到了面   霍金 放下了面   霍金 放下了叉子   爱因斯坦 拿到了叉子   居里夫人 拿到了面   霍金吃完后,爱因斯坦拿到了叉子,把叉子锁住了;居里夫人拿到了面,把面锁住了。爱因斯坦就想:居里夫人不给我面,我就吃不了面,所以我不给叉子。居里夫人就想:爱因斯坦不给我叉子我也吃不了面,我就不给叉子。所以就陷入了死循环。   为了解决Lock死锁的情况,就有了递归锁:RLock。 4.2 递归锁:RLock   所谓的递归锁也被称为“锁中锁”,指一个线程可以多次申请同一把锁,但是不会造成死锁,这就可以用来解决上面的死锁问题。 复制代码 import time from threading import Thread from threading import RLock def eatNoodles_1(noodle_lock, fork_lock, scientist): noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) time.sleep(1) print('{} 吃到了面'.format(scientist)) fork_lock.release() noodle_lock.release() print('{} 放下了面'.format(scientist)) print('{} 放下了叉子'.format(scientist)) def eatNoodles_2(noodle_lock, fork_lock, scientist): fork_lock.acquire() print('{} 拿到了叉子'.format(scientist)) noodle_lock.acquire() print('{} 拿到了面'.format(scientist)) print('{} 吃到了面'.format(scientist)) noodle_lock.release() print('{} 放下了面'.format(scientist)) fork_lock.release() print('{} 放下了叉子'.format(scientist)) scientist_list1 = ['霍金','居里夫人'] scientist_list2 = ['爱因斯坦','富兰克林'] noodle_lock=fork_lock = RLock() for i in scientist_list1: t = Thread(target=eatNoodles_1, args=(noodle_lock, fork_lock, i)) t.start() for i in scientist_list2: t = Thread(target=eatNoodles_2, args=(noodle_lock, fork_lock, i)) t.start() 复制代码   上面代码可以正常运行到所有科学家吃完面条。   RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次 4.3 Condition   Condition可以认为是一把比Lock和RLOK更加高级的锁,其在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition内部常用方法如下:   1)acquire(): 上线程锁   2)release(): 释放锁   3)wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。   4)notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。   5)notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程   需要注意的是,notify()方法、notifyAll()方法只有在占用琐(acquire)之后才能调用,否则将会产生RuntimeError异常。   用Condition来实现生产者消费者模型: 复制代码 import threading import time # 生产者 def produce(con): # 锁定线程 global num con.acquire() print("工厂开始生产……") while True: num += 1 print("已生产商品数量:{}".format(num)) time.sleep(1) if num >= 5: print("商品数量达到5件,仓库饱满,停止生产……") con.notify() # 唤醒消费者 con.wait()# 生产者自身陷入沉睡 # 释放锁 con.release() # 消费者 def consumer(con): con.acquire() global num print("消费者开始消费……") while True: num -= 1 print("剩余商品数量:{}".format(num)) time.sleep(2) if num <= 0: print("库存为0,通知工厂开始生产……") con.notify() # 唤醒生产者线程 con.wait() # 消费者自身陷入沉睡 con.release() con = threading.Condition() num = 0 p = threading.Thread(target=produce , args=(con ,)) c = threading.Thread(target=consumer , args=(con ,)) p.start() c.s
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信