# 52. threading线程管理模块、queue队列模块、concurrent线程池模块
# 一点理论知识
全局解释器锁GIL
Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
设置 GIL;
切换到一个线程去运行;
运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
把线程设置为睡眠状态;
解锁 GIL;
再次重复以上所有步骤。
在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。
python线程模块的选择
Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。
thread和threading模块允许程序员创建和管理线程。
thread模块提供了基本的线程和锁的支持,
threading提供了更高级别、功能更强的线程管理的功能。
Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。
避免使用thread模块,因为更高级别的threading模块更为先进,对线程的支持更为完善,而且使用thread模块里的属性有可能会与threading出现冲突;其次低级别的thread模块的同步原语很少(实际上只有一个),而threading模块则有很多;再者,thread模块中当主线程结束时,所有的线程都会被强制结束掉,没有警告也不会有正常的清除工作,至少threading模块能确保重要的子线程退出后进程才退出。
thread模块不支持守护线程,当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。而threading模块支持守护线程,守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求它就在那等着,如果设定一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。
# Threading.Thread线程模块
Thread模块是一个创建线程的模块,借助这个模块,就可以完成线程的创建跟使用。
# Thread模块使用简单介绍
Thread(group,target,name,args,kwargs),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
#常用
target= ##指定某一个方法或类开启子进程线程并运行,表示调用对象,即子进程要执行的任务
args=(,) ##指定传输给target函数位置的数据,元组形式,需要有逗号,表示调用对象的位置参数元组,args=(1,2,'egon',)
# 其他参数
group #参数未使用,值始终为None
kwargs #表示调用对象的字典,kwargs={'name':'egon','age':18}
name #为子进程的名称
# Thread类的其他方法
Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
threading模块提供的一些方法:
threading.currentThread() # 返回当前的线程对象。
threading.current_thread() # 返回当前的线程对象,和上述方法一样。
threading.enumerate() # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount() # 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
threading.active_count() # 返回正在运行的线程数量,和上述方法一样
# Thread模块方法介绍
# 启动进程 - .start()
启动进程,并调用该子进程中的p.run()
from threading import Thread
def so(n):
print(1)
t = Thread(target=so,args=(1,))
t.start()
执行结果:
1
**注意:**启动进程的时候要加上if name == 'main':,为什么要加呢,因为开启进程,Python解释器需要你要当前执行文件开启,如果判断你是否在当前文件执行开启,还是用自定义模块开启,所以就需要加上if name == 'main':
# 启动进程 - .run()
进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
from threading import Thread
def so(n):
print(1)
t = Thread(target=so,args=(1,))
t.run()
执行结果:
1
# 强制终止进程 - .terminate()
强制终止进程,不会进行任何清理操作,如果创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果还保存了一个锁那么也将不会被释放,进而导致死锁
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.start()
t.terminate()
使用了time模块的停止方法,让子进程暂时保留,让terminate()去结束这个子进程
# 检查子进程是否存活 - .is_alive()
如果子进程仍然运行,返回True
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.start()
print(t.is_alive())
执行结果:
True
1
# 等待子进程执行完毕 - .join()
主进程等待终止(强调:是主进程处于等的状态,而子进程是处于运行的状态)。timeout是可选的超时时间,需要强调的是,.join只能join住start开启的进程,而不能join住run开启的进程
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.start()
t.join()
print(2)
执行结果:
1
2
# 主进程作为子进程的守护进程 .daemon()
默认值为False,如果设为True,代表子进程为后台运行的守护进程,当子进程的父进程终止时,子进程也随之终止,并且设定为True后,子进程不能创建自己的新进程,必须在.start()之前设置
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.daemon = True
t.start()
# 设置并查看子进程的名称 - .name
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.start()
t.name = "name" ## 设置子进程名称
print(t.name) ## 查看子进程名称
执行结果:
name
1
# 查看子进程的PID - .pid()
from threading import Thread
import time
def so(n):
time.sleep(1)
print(1)
t = Thread(target=so,args=(1,))
t.start()
print(t.pid)
执行结果:
17332
1
# 不常用的方法
.exitcode
## 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
.authkey
##进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
# Thread模块创建线程
# 常规版
from threading import Thread
def a():
print("看什么看")
t = Thread(target=a)
t.start()
print("我就看你,咋地")
# 类版
from threading import Thread
class a(Thread):
def __init__(self):
super(a,self).__init__()
def run(self):
print("看什么看")
t = a()
t.start()
print("我就看你,咋地")
# 实验 - 进程跟线程的效率对比
from multiprocessing import Process
from threading import Thread
import time
def func():
pass
if __name__ == '__main__':
start = time.time()
for i in range(100):
p = Process(target=func)
p.start()
print('开100个进程的时间:',time.time() - start)
start = time.time()
for i in range(100):
p = Thread(target=func)
p.start()
print('开100个线程的时间:', time.time() - start)
执行结果:
开100个进程的时间: 9.228498935699463
开100个线程的时间: 0.337414026260376
## 效率对比,线程完胜进程
## cpu切换进程要比cpu切换线程 慢很多
## 在python中,如果IO操作过多的话,使用多线程最好了
## 使用TCP协议也是属于IO操作的一种
# 实验 - 进程跟线程的PID对比
from multiprocessing import Process
from threading import Thread
import time,os
def func(name):
print('我是一个%s,我的pid是%s'%(name,os.getpid()))
if __name__ == '__main__':
print('我是main,我的pid是%s'%(os.getpid()))
for i in range(2):
p = Process(target=func,args=('进程',))
p.start()
for i in range(2):
p = Thread(target=func,args=('线程',))
p.start()
执行结果:
我是main,我的pid是9284
我是一个线程,我的pid是9284
我是一个线程,我的pid是9284
我是一个进程,我的pid是5972
我是一个进程,我的pid是9032
## 在同一个进程内,所有线程共享这个进程的pid,也就是说所有线程共享所属进程的所有资源和内存地址
# 实验 - 线程的共享数据问题
from threading import Thread,Lock
import time,os
def func():
global num
tmp = num
time.sleep(0.00001)
num = tmp - 1
if __name__ == '__main__':
num = 100
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
# time.sleep(1)
[t.join() for t in t_l]
print(num)
执行结果:
# 执行的结果每次都在变化
## 在同一个进程内,所有线程共享该进程中的全局变量,但是线程之间不能共享
## 因为GIL限制在同个时间片中只能运行一个线程,线程是无法做到并行的效果的
## 因为有了GIL数据如果没有进行加锁之类的操作,可能会产生数据混乱的情况
## 因为有GIL锁的存在,在Cpython中,没有真正的线程并行。但是有真正的多进程并行
## 当你的任务是计算密集的情况下,使用多进程好
## 总结:在CPython中,IO密集用多线程,计算密集用多进程
# 实验 - 线程的守护进程
注意:线程的守护进程,是要等所属进程完全结束,并不是代码执行结果,线程才会退出
进程的守护进程,是主进程的代码执行完毕后,立即退出
from threading import Thread
import time
def func():
time.sleep(2)
print(123)
t = Thread(target=func)
t.daemon = True
t.start()
# 守护线程是根据主线程执行结束才结束
# 守护线程不是根据主线程的代码执行结束而结束
# 主线程会等待普通线程执行结束,再结束
# 守护线程会等待主线程结束,再结束
# 所以,一般把不重要的事情设置为守护线程
# 守护进程是根据主进程的代码执行完毕,守护进程就结束
# 对主进程来说,运行完毕指的是主进程代码运行完毕
# 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
# 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
# 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
# 守护进程是随着父进程的代码执行结束而结束
# 守护线程不是随着父线程的代码执行结束而结束
# 守护线程是随着父线程的执行结束而结束
# 实验 - 多线程的socket聊天
## 服务端
from threading import Thread
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
def func(conn):
while 1:
info = conn.recv(1024).decode('utf-8')
print(info)
conn.send(info.upper().encode('utf-8'))
if __name__ == '__main__':
while 1:
conn,addr = sk.accept()
Thread(target=func,args=(conn,)).start()
## 客户端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
while 1:
msg_s = input('>>>')
sk.send(msg_s.encode('utf-8'))
print(sk.recv(1024).decode('utf-8'))
# Threading.Lock互斥锁模块
互斥锁:一把钥匙配一把锁
# Lock模块的功能介绍
Lock模块的功能就只有二个,一个是解锁,一个是锁上
from threading import Lock
lc = Lock() ## 实例化Lock类,不用加参数
lc.acquire() ## 锁上,把以下的资源全部锁上,不让其他进程操作,除非解锁了
lc.release() ## 解锁,把以上的资源释放了,可以让其他进程进行操作
# 锁的用处
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题,
可以使用加锁的形式确保了程序的顺序执行,但是执行又变成了串行,降低了效率,但是不得不说,它确保了数据的安全性。
让一份数据资源,保证只有一个进程进行操作,这样才不会出现数据混乱的现象
# Lock模块的简单使用
from threading import Lock
l = Lock()# 一把钥匙配一把锁
l.acquire()
print('abc')
l.acquire()# 程序会阻塞住 陷入死锁了
print(123)
## 这里实现了,但是程序因为拿了钥匙没还,现在遇到另一把锁,就无法获取到钥匙,进入了死锁状态
# 实验 - 死锁的模块
from threading import Thread,Lock
import time,os
def man(l_tot,l_pap):
l_tot.acquire()# 是男的获得厕所资源,把厕所锁上了
print('小年在厕所上厕所')
time.sleep(1)
l_pap.acquire()# 男的拿纸资源
print('小年拿到卫生纸了!')
time.sleep(0.5)
print('小年完事了!')
l_pap.release()# 男的先还纸
l_tot.release()# 男的还厕所
def woman(l_tot,l_pap):
l_pap.acquire() # 女的拿纸资源
print('小雪拿到卫生纸了!')
time.sleep(1)
l_tot.acquire() # 是女的获得厕所资源,把厕所锁上了
print('小雪在厕所上厕所')
time.sleep(0.5)
print('小雪完事了!')
l_tot.release() # 女的还厕所
l_pap.release() # 女的先还纸
if __name__ == '__main__':
l_tot = Lock()
l_pap = Lock()
t_man = Thread(target=man,args=(l_tot,l_pap))
t_woman = Thread(target=woman,args=(l_tot,l_pap))
t_man.start()
t_woman.start()
## 实例化出二个锁跟二个钥匙,分别锁到厕所跟卫生纸中,钥匙也放进去,现在同时有二个人,一男一女,男的先去了厕所,就拿走了厕所的资源,等于拿走了厕所的钥匙,并把厕所锁上了,别一个女的,先去了卫生纸,获取了卫生纸的资源,拿走了钥匙并锁上了,这时女的就在等男的上完厕所,过会,男的上完了,在找卫生纸,但是卫生纸的钥匙在女的手里,所以他得不到卫生纸,没卫生纸擦屁股,谁敢出去,所以男的就一直等,等到可以获取卫生纸的钥匙,
## 男的等卫生纸的资源 ,女的等厕所资源,所以进入了死锁状态
注意:使用互斥锁是不能解决死锁的问题,但是可以使用以下的模块,递归锁模块来解决死锁的问题
# Threading.RLock递归锁模块
递归锁:可以有无止尽的锁,但是会有一把万能钥匙,这把钥匙可以开所有的锁
# RLock模块的功能介绍
RLock模块的功能就只有二个,一个是锁,一个是钥匙,锁可以无限的上,钥匙是万能钥匙,能开所有的锁
from threading import RLock
rc = RLock() ## 实例化Lock类,不用加参数
rc.acquire() ## 锁上,把以下的资源全部锁上,不让其他进程操作,除非解锁了
rc.release() ## 解锁,把以上的资源释放了,可以让其他进程进行操作
# RLcok模块的简单使用
from threading import RLock
s = RLock()
s.acquire()
s.acquire()
s.acquire()
s.acquire()
s.acquire()
s.acquire()
print(123)
执行结果:
123
## 只要是同一个实例化对象,那么锁对于有钥匙的对象来说,都是虚无的,都可以开
# 实验 - 解决死锁
from threading import Thread,RLock
import time
def man(l_tot,l_pap):
l_tot.acquire()# 是男的获得厕所资源,把厕所锁上了
print('小年在厕所上厕所')
time.sleep(1)
l_pap.acquire()# 男的拿纸资源
print('小年拿到卫生纸了!')
time.sleep(0.5)
print('小年完事了!')
l_pap.release()# 男的先还纸
l_tot.release()# 男的还厕所
def woman(l_tot,l_pap):
l_pap.acquire() # 女的拿纸资源
print('小雪拿到卫生纸了!')
time.sleep(1)
l_tot.acquire() # 是女的获得厕所资源,把厕所锁上了
print('小雪在厕所上厕所')
time.sleep(0.5)
print('小雪完事了!')
l_tot.release() # 女的还厕所
l_pap.release() # 女的先还纸
if __name__ == '__main__':
l_tot = l_pap = RLock()
t_man = Thread(target=man,args=(l_tot,l_pap))
t_woman = Thread(target=woman,args=(l_tot,l_pap))
t_man.start()
t_woman.start()
# 实例化出一把万能锁跟无限的锁,把厕所跟卫生纸的资源全锁上,男的跟女的,就要看谁先到,因为钥匙只有一把,但是这把钥匙能解开所有的锁,就是说,这把钥匙可以解开厕所跟卫生纸的锁并获取他们的资源
# 这样就不会进入死锁状态
# RLock是递归锁 --- 是无止尽的锁,但是所有锁都有一个共同的钥匙
# 想解决死锁,配一把公共的钥匙就可以了。
# Threading.Semaphorew信号量模块
信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。
信号量同步基于内部计数器,用户初始化一个计数器初值,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1。当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
# Semaphore模块的功能介绍
Semaphore模块的功能就只有二个,一个是解锁,一个是锁上,能解锁的数量是由实例化的时候控制的
from threading import Semaphore
s = Semaphore(4) ## 实例化一个对象,参数为 n ,能有几个可以解锁的,4为有4个钥匙可以解锁
s.acquire() ## 锁上,拿走一把钥匙
s.release() ## 解锁,释放一把钥匙
# Semaphore模块的简单使用
from threading import Semaphore
s = Semaphore(3)
s.acquire()
print(123)
s.acquire()
print(456)
s.acquire()
print(789)
s.acquire()
print(111)
执行结果:
123
456
789
## 程序执行到789之后的s.acquire(),就会进入阻塞,等待程序释放钥匙
# 实验 - 模拟休息室
from threading import Semaphore,Thread
import time
def func(sem,i):
sem.acquire()
print('第%s个人进入屋子'%i)
time.sleep(2)
print('第%s个人离开屋子'%i)
sem.release()
sem = Semaphore(5)
for i in range(20):
t = Thread(target=func,args=(sem,i))
t.start()
## 模拟休息室,休息室最多可以供五个人休息,来了20个人,先进去五个人,剩下的只能排队在外面等,出来一个就能进去一个
# Threading.Event事件模块
python中的事件机制,主要用于主进程控制其他进程的执行
- 线程的一个关键特性是每个线程都是独立运行且状态不可预测。
- 如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。
- 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。默认Event对象中的信号标志被设置为假。
- 如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
- 一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。
- 如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
# Event模块的功能介绍
事件主要提供了三个方法 set、wait、clear
事件处理的机制:全局定义了一个“Flag”(event.is_set()),如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
from threading import Event
e = Event() #实例化一个对象
e.set() #将is_set()设为True
e.clear() # 将is_set()设为False
e.wait() #判断is_set的bool值,如果bool为True,则非阻塞,bool值为False,则阻塞
e.is_set() # 标识,查看bool值
# 事件是通过is_set()的bool值,去标识e.wait() 的阻塞状态
# 当is_set()的bool值为False时,e.wait()是阻塞状态
# 当is_set()的bool值为True时,e.wait()是非阻塞状态
# 当使用set()时,是把is_set的bool变为True
# 当使用clear()时,是把is_set的bool变为False
# Event模块的简单使用
from threading import Event
print(e.is_set())# False wait应该是阻塞住
e.set()# 将is_set 的bool值变为True,将wait变为非阻塞
e.wait()
print(e.is_set())
执行结果:
False
True
#################################################################################
from threading import Event
print(e.is_set())# False wait应该是阻塞住
e.set()# 将is_set 的bool值变为True,将wait变为非阻塞
e.wait()
print(e.is_set())
print(123)
e.clear()
print(e.is_set())
e.wait()
print(123)
执行结果:
False
True
123
False
## 程序进行阻塞等待
# 实验 - 模拟数据库连接
from threading import Thread,Event
import time
import random
def conn_mysql(e,i):
count = 1
while 1:
if e.is_set():# 如果为True,就是可以连接上数据库
break
if count > 3:
print('连接超时')
return
print('第%s个人正在尝试第%s次连接!'%(i,count))
e.wait(0.5)# 在这里阻塞等待0.5秒,模拟用户连接时的等待
count+=1
print('第%s个人连接成功'%i)
def check_mysql(e):
print('\033[45m 数据库正在维护 \033[0m')# 让数据库初始状态处于维护状态,默认所有用户连接不上
time.sleep(random.randint(1,2))# 随机1秒或2秒,如果随机1秒的话,用户就可以连接上,2秒就连接不上
e.set()# 将e.is_set()设为True
if __name__ == '__main__':
e = Event()
t = Thread(target=check_mysql,args=(e,))
t.start()
for i in range(10):# 产生10个线程都去尝试连接数据库
t1 = Thread(target=conn_mysql,args=(e,i))
t1.start()
# Threading.Condition条件模块
使得线程等待,只有满足某条件时,才释放指定几个线程
Python提供的Condition对象提供了对复杂线程同步问题的支持。
Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。
不断的重复这一过程,从而解决复杂的同步问题。
# Condition模块的功能介绍
from threading import Condition
acquire() ## 锁
release() ## 解锁
## 这二个方法的底层是引用RLock递归锁
wait() ## 是指让线程阻塞住
notify(int) ## 是指给wait发一个信号,让wait变成不阻塞
int ## 是指,你要给多少给wait发信号
# 实例 - 模拟进行
from threading import Thread,Condition
def func(con,i):
con.acquire() ## 锁上,只有当前线程可以调用
con.wait()# 线程执行到这里,会阻塞住,等待notify发送信号,来唤醒此线程
con.release() ## 解锁
print('第%s个线程开始执行了!'%i)
if __name__ == '__main__':
con = Condition()
for i in range(10):
t = Thread(target=func,args=(con,i))
t.start()
while 1:
num = int(input(">>>"))
con.acquire() ## 锁上
con.notify(num)# 发送一个信号给num个正在阻塞在wait的线程,让这些线程正常执行
con.release() ## 解锁
# 主线程和10个子线程都在抢夺递归锁的一把钥匙。
# 如果主线程抢到钥匙,主线程执行while 1,input,然后notify发信号,还钥匙。但是,此时如果主线程执行特别快
# 极有可能接下来主线程又会拿到钥匙,那么此时哪怕其他10个子线程的wait接收到信号,但是因为没有拿到钥匙,所以其他子线程还是不会执行
# Threading.Timer定时器模块
定时器,指定n秒后执行某个操作
# Timer模块的功能介绍
from threading import Timer
Timer(2.5,func).start()
# Timer(time,func)
# time:睡眠的时间,以秒为单位
# func:睡眠时间之后,需要执行的任务
# .start() :启动
# Timer模块的简单使用
from threading import Timer
def func():
print('就是这么nb!')
Timer(2.5,func).start()
## 执行时,先阻塞睡眠2.5秒,在执行
# queue队列模块 - 同一进程内的队列
queue队列 :使用import queue,用法与进程Queue一样
为什么说,queue队列是同一进程内的队列,因为这模块的队列不能跨进程,就只能在当前进程中使用
- 比如,我在代码中使用了queue队列,但是使用了子进程,发送了子进程无法使用父进程的队列
- 进程,拥有所有的资源单位,每个进程的资源都是独立的,包含子进程跟父进程
- 但是,线程:无法拥有资源,只能依赖所属进程的资源,所以线程中使用的队列就是queue队列
- 线程,无法拥有资源,依赖所属进程的资源,可以使用所属进程的资源,但是每个线程中又是独立的个体
queue模块目前学习三种队列
- queue.Queue():遵守先进先出规则
- queue.LifoQueue():遵守后进先出、先进后出的规则
- queue.PriorityQueue():遵守优先级先出的规则
- 优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)
- 如果都是int,比数值的大小
- 如果都是str,比较字符串的大小(从第一个字符的ASCII码开始比较)
- 优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)
# 必看
以下三个模块的方法跟使用都跟进程的multiprocess.Queue模块基本一样,所以这样不会写太多实验之类的了,如果有需要,可以去看一下进程的multiprocess.Queue模块,或自行百度
# queue.Queue队列模块 - 先进先出
跟进程的multiprocess.Queue模块基本一样,可以去考虑一下
# queue.Queue模块的简单功能介绍
目前只介绍二个功能,一个是存,一个是取
import queue
qe = queue.Queue(n) ## 实例化,n:代表指定队列容量,如果不指定,默认是无限制
## 取值
qe.get() ## 接收数据,从队列获取值,阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
qe.get_nowait() # 不接收数据,从队列获取值,阻塞,如果有数据直接获取,没有数据就报错
## 存值
qe.put() ## 存入数据,把数据存入在队列中,阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
qe.put_nowait() ## 存入数据,把数据存入在队列中,不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错
# queue.Queue模块的简单使用
import queue
qe = queue.Queue()
for i in range(100):
qe.put(i)
for i in range(100):
print(qe.get())
## 循环存入1-99,循环取出1-99
# queue.LifoQueue队列模块 - 后进先出
跟进程的multiprocess.Queue模块基本一样,可以去考虑一下
# queue.LifoQueue模块的简单功能介绍
目前只介绍二个功能,一个是存,一个是取
import queue
lq = queue.LifoQueue(n) ## 实例化,n:代表指定队列容量,如果不指定,默认是无限制
## 取值
lq.get() ## 接收数据,从队列获取值,阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
lq.get_nowait() # 不接收数据,从队列获取值,阻塞,如果有数据直接获取,没有数据就报错
## 存值
lq.put() ## 存入数据,把数据存入在队列中,阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
lq.put_nowait() ## 存入数据,把数据存入在队列中,不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错
# queue.LifoQueue模块的简单使用
import queue
lq = queue.LifoQueue()
for i in range(5):
lq.put(i)
for i in range(6):
print(lq.get())
执行结果:
4
3
2
1
0
# queue.PriorityQueue队列模块 - 优先级先出
跟进程的multiprocess.Queue模块基本一样,可以去考虑一下
# queue.PriorityQueue模块的简单功能介绍
目前只介绍二个功能,一个是存,一个是取
import queue
pq = queue.PriorityQueue(n) ## 实例化,n:代表指定队列容量,如果不指定,默认是无限制
## 取值
pq.get() ## 接收数据,从队列获取值,阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
pq.get_nowait() # 不接收数据,从队列获取值,阻塞,如果有数据直接获取,没有数据就报错
## 存值
pq.put((优先级,数据)) ## 存入数据,把数据存入在队列中,阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
pq.put_nowait((优先级,数据)) ## 存入数据,把数据存入在队列中,不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错
# 元组中第一个参数是:表示当前数据的优先级
# 元组中第二个参数是:需要存放到队列中的数据
# 优先级的比较(首先保证整个队列中,所有表示优先级的东西类型必须一致)
# 如果都是int,比数值的大小
# 如果都是str,比较字符串的大小(从第一个字符的ASCII码开始比较)
# 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据
# 优先级如果是数字,直接比较数值
# 如果是字符串,已字符串的第一位字符按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则
# queue.LifoQueue模块的简单使用
import queue
pq = queue.PriorityQueue()
pq.put((2,"陈"))
pq.put((1,"江"))
pq.put((12,"李"))
pq.put((8,"龙"))
pq.put((24,"唐"))
for i in range(5):
get = pq.get()
print(get)
print("优先级:%s,数据:%s"%(get[0],get[1]))
执行结果:
(1, '江')
优先级:1,数据:江
(2, '陈')
优先级:2,数据:陈
(8, '龙')
优先级:8,数据:龙
(12, '李')
优先级:12,数据:李
(24, '唐')
优先级:24,数据:唐
# 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据
# 优先级如果是数字,直接比较数值
# 如果是字符串,是按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则
# concurrent.futures模块 - 进程池跟线程池
Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。
但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。
threading:异步模式的线程池模块
multiprocessing:异步模块的进程池模块
# concurrent.futures模块的功能介绍
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
ThreadPoolExecutor:线程池,只提供异步调用
ProcessPoolExecutor: 进程池,只提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
#基本方法
submit(fn, *args, **kwargs) # 异步提交任务
# 比如:submit(func, i)
# fun:调用的函数
# i :要转递给函数的参数
map(func, *iterables, timeout=None, chunksize=1) # 取代for循环submit的操作
# 是map的方式提交任务,结果是一个生成器
# 比如:map(func,range(1000))
shutdown(wait=True) # 相当于进程池的pool.close()+pool.join()操作
# wait=True,默认值就是True
# wait=True,等待池内所有任务执行完毕回收完资源后才继续
# wait=False,立即返回,并不会等待池内的任务执行完毕
# 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
# submit和map必须在shutdown之前
result(timeout=None) # 取得结果
add_done_callback(fn) # 回调函数 fn:代表要调用的方法
# 多线程使用submit()方法传递多个值
te.submit(函数,值,值)
te.submit(core.server_user,i,logger)
# 实验 - 使用线程池来计算平方求和
from concurrent.futures import ThreadPoolExecutor
import time
def a(i):
su = 0
for s in range(i):
su += i ** 2
return su
te = ThreadPoolExecutor(20) ## 线程池的等待线程为20个
start = time.time()
te_l = []
for i in range(1000):
te_a = te.submit(a,i) ## 开启线程,该线程要执行完把结果返回到te_a
te_l.append(te_a) ## 返回到te_a后,还要等把te_a的值写入到te_l中,之后该线程才可以接下一个任务
te.shutdown() ## 等待线程全部执行完毕
[print(i.result()) for i in te_l] ## 注意,得到的结果是一个内存地址,想要得到真正的结果要使用.result()来获取
print('消耗时间为%s'%(time.time() - start))
执行结果:
997002999
消耗时间为0.2991969585418701
## 结果只取最后二行
# 实验 - 使用进程池来计算平方求和
from concurrent.futures import ProcessPoolExecutor
import time
def a(i):
su = 0
for s in range(i):
su += i ** 2
return su
if __name__ == '__main__':
pe = ProcessPoolExecutor(20) ## 进程池的等待线程为20个
start = time.time()
pe_l = []
for i in range(1000):
pe_a = pe.submit(a,i) ## 开启进程,该线程要执行完把结果返回到te_a
pe_l.append(pe_a) ## 返回到te_a后,还要等把te_a的值写入到te_l中,之后该线程才可以接下一个任务
pe.shutdown() ## 等待进程全部执行完毕
[print(i.result()) for i in pe_l] ## 注意,得到的结果是一个内存地址,想要得到真正的结果要使用.result()来获取
print('消耗时间为%s'%(time.time() - start))
执行结果:
997002999
消耗时间为4.066568613052368
## 结果只取最后二行
# 实验 - 使用线程池来计算平方求和 - map方法
from concurrent.futures import ThreadPoolExecutor
import time
def a(i):
su = 0
for s in range(i):
su += i ** 2
return su
te = ThreadPoolExecutor(20) ## 线程池的等待线程为20个
start = time.time()
te_a = te.map(a,range(1000)) ## 使用map方法来调用线程,分配1000个任务,让线程循环启动执行
te.shutdown() ## 等待线程全部执行完毕
[print(i) for i in te_a] ## 生成的结果,是一个生成器,可以直接for循环取结果
print('消耗时间为%s'%(time.time() - start))
执行结果:
997002999
消耗时间为0.31017208099365234
## 结果只取最后二行
# 实验 - 使用线程池来计算平方求和 - 回调函数
from concurrent.futures import ThreadPoolExecutor
import time
def a(i):
su = 0
for s in range(i):
su += i ** 2
return su
def b(su):
print("回调函数:",su.result()) ## 注意,得到的结果是一个内存地址,想要得到真正的结果要使用.result()来获取
te = ThreadPoolExecutor(20) ## 线程池的等待线程为20个
start = time.time()
for i in range(1000):
te.submit(a,i).add_done_callback(b) ## 开启线程,设置返回结果给回调函数要调用的函数
te.shutdown() ## 等待线程全部执行完毕
print('消耗时间为%s'%(time.time() - start))
执行结果:
回调函数: 997002999
消耗时间为0.35057926177978516
## 结果只取最后二行
# 线程池中的回调函数是谁在调用?
# 线程池中的回调函数是子线程调用的,和父线程没有关系
# 进程池中的回调函数是父进程调用的,和子进程没有关系
# 实验 - 证明线程的回调函数是谁调用的
from threading import current_thread ## current_thread方法,查看线程对象信息,要使用这方法来证明
from concurrent.futures import ThreadPoolExecutor
import time
def func(i):
sum = 0
sum += i
time.sleep(1)
print('这是在子线程中',current_thread()) ## 打印子线程的线程对象信息
return sum
def call_back(sum):
time.sleep(1)
print('这是在回调函数中',sum.result(),current_thread()) ## 打印回调函数的线程对象信息
if __name__ == '__main__':
t = ThreadPoolExecutor(5)
for i in range(1):
t.submit(func,i).add_done_callback(call_back)
t.shutdown()
print('这是在主线程中',current_thread()) ## 打印主线程的线程对象信息
执行结果:
这是在子线程中 <Thread(ThreadPoolExecutor-0_0, started daemon 15812)>
这是在回调函数中 0 <Thread(ThreadPoolExecutor-0_0, started daemon 15812)>
这是在主线程中 <_MainThread(MainThread, started 15548)>
## 这样就可以看出来,在线程中回调函数是那个调用的
## 简洁一下结果:
这是在子线程中 (ThreadPoolExecutor-0_0, started daemon 15812)
这是在回调函数中 (ThreadPoolExecutor-0_0, started daemon 15812)
这是在主线程中 (MainThread, started 15548)
## 这样更容易看出来
# 总结
针对计算密集的程序来说
- 不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当
- ThreadPoolExecutor 的效率要差很多
- 所以 当计算密集时,使用多进程。
不同的方式提交多个任务(for+submit 或者 map),拥有不同的拿结果的方式
- 如果是 for+submit 的方式提交任务,拿结果用 result 方法
- 如果是 map 的方式提交任务,结果是一个生成器,采用__next__()的方式去拿结果,或for循环
线程池中的回调函数是谁在调用:
- 线程池中的回调函数是子线程调用的,和父线程没有关系
- 进程池中的回调函数是父进程调用的,和子进程没有关系