一、concurrent.futures模块
此模块提供了高度封装的异步调用接口,支持进程池异步调用(ProcessPoolExecutor)和线程池异步调用(ThreadPoolExecutor),使用方式类似于进程池pool()中的异步调用。
1、进程池异步调用
异步调用实例:
from concurrent.futures import ProcessPoolExecutorimport os,time,randomdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ProcessPoolExecutor(4) #数字4代表限制的进程数量,不写,默认为cpu的个数 futures=[] for i in range(10): future=executor.submit(work,i) #submit为异步提交任务,返回结果对象 futures.append(future) executor.shutdown(wait=True) for obj in futures: print(obj.result()) #result()为获取结果,相当于pool中的get() print("主")
注:上例中,executor.shutdown(wait=True)相当于进程池的pool.close()+pool.join()操作,wait=True,等待池内所有任务执行完毕回收完资源后才继续;wait=False,立即返回,并不会等待池内的任务执行完毕,但不管wait参数为何值,整个程序都会等到所有任务执行完毕。
模拟同步调用实例:
from concurrent.futures import ProcessPoolExecutorimport os,time,randomdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ProcessPoolExecutor() for i in range(10): future=executor.submit(work,i).result() print(future) executor.shutdown(wait=True) print("主")
2、线程池异步调用
线程池异步调用实例:
from concurrent.futures import ThreadPoolExecutorimport os,time,randomdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ThreadPoolExecutor(4) #4代表开启的线程的个数,不写,默认为cpu的个数乘5 futures=[] for i in range(10): future=executor.submit(work,i) futures.append(future) executor.shutdown(wait=True) for obj in futures: print(obj.result()) print('主')
3、map方法
map(func, *iterables, timeout=None, chunksize=1) ,取代for循环submit的操作,结果为包含函数结果的一个生成器。
实例:
from concurrent.futures import ThreadPoolExecutorimport os,time,randomdef work(n): print('%s is running' %os.getpid()) time.sleep(random.randint(1,3)) return n**2if __name__ == '__main__': executor=ThreadPoolExecutor(4) futures=executor.map(work,range(10)) #可迭代对象的每一个值即为函数的参数 executor.shutdown(wait=True) print(type(futures)) #print(list(futures)) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] print('主')
4、回调函数
与pool中的回调函数不同的是获得函数执行完返回给解析函数的是一个结果对象,所以解析函数在执行时需要通过result()方法取到结果。具体回调格式如下代码所示:
from concurrent.futures import ProcessPoolExecutorimport os,random,timeimport requestsdef get(url): print('%s GET %s' %(os.getpid(),url)) response=requests.get(url) time.sleep(random.randint(1,3)) if response.status_code == 200: print('%s DONE %s' % (os.getpid(), url)) return { 'url':url,'text':response.text}def parse(future): dic=future.result() #返回的是一个对象 print('%s PARSE %s' %(os.getpid(),dic['url'])) time.sleep(1) res='%s:%s\n' %(dic['url'],len(dic['text'])) with open('db.txt','a') as f: f.write(res)if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=ProcessPoolExecutor(4) for url in urls: p.submit(get,url).add_done_callback(parse) p.shutdown(wait=True) print('主')
二、多线程中的同步锁
在python的多线程中GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。所有线程抢的是GIL锁,或者说所有线程抢的是执行权限。线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。这就导致了串行运行的效果。
1、多线程无互斥锁情况开启的多线程会在极短的时间内完成,所有线程有都有可能同时拿到n=100,然后抢到GIL锁的线程在遇到阻塞后会释放锁,其他线程再去竞争GIL锁。
from threading import Threadimport os,timen=100def work(): global n temp=n time.sleep(0.1) n=temp-1if __name__ == '__main__': l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
2、多线程增加互斥锁情况
from threading import Thread,Lockimport os,timedef work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release()if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
分析:100个线程去抢GIL锁,即抢执行权限,肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire(),极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL,直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复以上的过程。
三、死锁现象与递归锁
1、死锁现象
是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁:
from threading import Thread,Lockimport timemutexA=Lock()mutexB=Lock()class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A锁\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B锁\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B锁\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A锁\033[0m' %self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(10): t=MyThread() t.start()'''Thread-1 拿到A锁Thread-1 拿到B锁Thread-1 拿到B锁Thread-2 拿到A锁然后就卡住,死锁了'''
2、递归锁 (可以acquire多次)
为解决上述死锁现象,出现了递归锁的概念:在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import Thread,RLockimport timemutexA=mutexB=RLock()class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 拿到A锁' %self.name) mutexB.acquire() print('%s 拿到B锁' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 拿到B锁' %self.name) time.sleep(2) mutexA.acquire() print('%s 拿到A锁' %self.name) mutexA.release() mutexB.release()if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
分析:一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止。
四、信号量Semaphore
信号量本质上是一把锁,与互斥锁不同的是,互斥锁可以类比于家庭厕所,一次只能进去一个人,二信号量则相当于公共厕所,一次可以进去多人,具体人数就是由代码中4限制的,谁先结束,谁先释放茅坑,下一个线程或进程进去。和进程池没有关系,它会开启一堆线程或进程。
from threading import Thread,current_thread,Semaphoreimport time,randomdef work(): S.acquire() print('%s 正在上厕所'%current_thread().getName()) time.sleep(random.randint(1,3)) S.release()if __name__ == '__main__': S=Semaphore(4) #4表示最大最大的连接数为4 for i in range(10): t=Thread(target=work) t.start()
五、事件Event
同进程的一样,线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
1、方法介绍
1.isSet()/is_set(): 返回event的状态值;2.wait(): 如果 .isSet()==False将阻塞线程;3.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;4.clear(): 恢复event的状态值为False。
2、实例展示
from threading import Event,current_thread,Threadimport timedef check(): print('%s 正在检测' %current_thread().getName()) time.sleep(3) event.set()def conn(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('连接超时') print('%s 正在等待%s连接' % (current_thread().getName(),count)) event.wait(timeout=1) #设置阻塞时间 count+=1 print('%s 开始连接' % current_thread().getName())if __name__ == '__main__': event = Event() t1=Thread(target=check) t2=Thread(target=conn) t3=Thread(target=conn) t4=Thread(target=conn) t1.start() t2.start() t3.start() t4.start()
六、定时器Timer
指定n秒后,执行某操作。实例如下:
from threading import Timerdef work(): print('hello,world')if __name__ == '__main__': t=Timer(3,work)#3秒后执行work()代码 t.start()
七、线程queue
queue队列 :使用import queue,用法与进程Queue一样---queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
1、先进先出
import queueq=queue.Queue()q.put('first')q.put('second')q.put('third')print(q.get())print(q.get())print(q.get())'''结果:firstsecondthird'''
2、后进先出
import queueq=queue.LifoQueue() #last in first outq.put('first')q.put('second')q.put('third')print(q.get())print(q.get())print(q.get())'''结果:thirdsecondfirst'''
3、优先级原则
put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
import queueq=queue.PriorityQueue()q.put((20,'a'))q.put((10,'b'))q.put((30,'c'))print(q.get())print(q.get())print(q.get())'''结果(数字越小优先级越高,优先级高的优先出队):(10, 'b')(20, 'a')(30, 'c')'''