Python 与线程 线程是进程的执行单元,对于大多数程序来说,可能只有一个主线程,但是为了能够提高效率,有些程序会采用多线程,在系统中所有的线程看起来都是同时执行的,例如,现在的多线程网络下载程序中,就使用了这种线程并发的特性,程序将欲下载的文件分成多个部分,然后同时进行下载,从而加快速度.虽然线程并不是一个容易掌握和使用的概念,但是如果运用得当,还是可以获得很不错的性能的.
◆创建使用线程◆ 在 Python 中创建线程需要用到一个类,threading
类,其类的实现方法是底层调用了C语言的原生函数来实现的创建线程,创建线程有两种方式,一种是直接使用函数创建线程,另一种则是使用类创建线程,两种创建方式效果是相同的,但是需要注意一点,在使用类的方式创建线程的时候,默认执行run(self)方法
,且此函数名称必须是run不能修改,接下来看3个小例子吧.
使用函数创建线程: 通过线程模块创建线程,并传递参数即可实现直接对指定函数实现多线程.
import osimport threadingimport timedef MyThread (x,y ): print ("传递的数据:%s,%s" %(x,y)) time.sleep(5 ) for x in range (10 ): thread = threading.Thread(target=MyThread,args=(x,x+1 ,)) thread.start()
使用类创建线程: 通过定义类,传递给类中一些参数,然后启动多线程,这种方式不常用.
import osimport threadingimport timeclass MyThread (threading.Thread): def __init__ (self,x,y ): super (MyThread,self).__init__() self.x = x self.y = y def run (self ): print ("运行线程, X=%s Y=%s" %(self.x,self.y)) for i in range (10 ): obj = MyThread(i,i+10 ) obj.start()
import paramiko,datetime,threadingclass MyThread (threading.Thread): def __init__ (self,address,username,password,port,command ): super (MyThread, self).__init__() self.address = address self.username = username self.password = password self.port = port self.command = command def run (self ): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try : ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1 ) stdin, stdout, stderr = ssh.exec_command(self.command) result = stdout.read() if not result: self.result = stderr.read() ssh.close() self.result = result.decode() except Exception: self.result = "0" def get_result (self ): try : return self.result except Exception: return None ThreadPool = [] starttime = datetime.datetime.now() for item in range (5 ): obj = MyThread("192.168.1.20" ,"root" ,"123" ,"22" ,"ifconfig" ) ThreadPool.append(obj) for item in ThreadPool: item.start() item.join() for item in ThreadPool: ret = item.get_result() print (ret) endtime = datetime.datetime.now() print ("程序开始运行:{} 结束:{}" .format (starttime,endtime))
接收线程返回结果: 我们可以使用join
方法,等待线程执行完毕后的返回结果.
import osimport threadingimport timedef MyThread (x,y ): print ("传递的数据:%s,%s" %(x,y)) time.sleep(5 ) return "ok" temp=[] for x in range (10 ): thread = threading.Thread(target=MyThread,args=(x,x+1 ,)) thread.start() temp.append(thread) for y in temp: y.join() print ("线程: %s" %y)
◆线程锁与信号◆ 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以就出现了线程锁的概念,即在同一时刻只允许一个线程执行操作,在这里我们选择使用Rlock,而不使用Lock,因为Lock如果多次获取锁的时候会出错,而RLock允许在同一线程中被多次acquire,但是需要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放之前,其他线程只有等待线程结束后在进行操作.
全局锁(Lock): 添加本全局锁以后,能够保证在同一时间内保证只有一个线程具有权限.
import timeimport threadingnum = 0 thread_list = [] lock = threading.Lock() def SumNumber (): global num time.sleep(2 ) lock.acquire() num += 1 lock.release() for x in range (50 ): thread = threading.Thread(target=SumNumber) thread.start() thread_list.append(thread) for y in thread_list: y.join() print ("计算结果: " ,num)
递归锁(RLock): 递归锁和全局锁差不多,递归锁就是在大锁中还要添加个小锁,递归锁是常用的锁.
import threadingimport timenum = 0 lock = threading.RLock() def fun1 (): lock.acquire() global num num += 1 lock.release() return num def fun2 (): lock.acquire() res = fun1() print ("计算结果: " ,res) lock.release() if __name__ == "__main__" : for x in range (10 ): thread = threading.Thread(target=fun2) thread.start() while threading.active_count() != 1 : print (threading.active_count()) else : print ("所有线程运行完成..." ) print (num)
互斥锁(Semaphore): 同时允许一定数量的线程更改数据,也就是限制每次允许执行的线程数.
import threading,timenum = 0 semaphore = threading.BoundedSemaphore(5 ) def run (n ): semaphore.acquire() time.sleep(1 ) print ("运行这个线程中: %s" %n) semaphore.release() if __name__ == '__main__' : for i in range (20 ): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1 : pass else : print ('----所有线程执行完毕了---' ) print (num)
◆线程驱动事件◆ 事件驱动(Event): 线程事件用于主线程控制其他线程的执行,事件主要提供了三个方法set、wait、clear、is_set
,分别用于设置检测和清除标志.
事件处理机制定义:全局定义了一个”Flag”,如果”Flag”值为False,那么当程序执行event.wait 方法
时就会阻塞,如果”Flag”值为True,那么在执行event.wait 方法
时便不再阻塞,变成可执行模式,总体来说需要了解以下四个方法.
clear:将”Flag”设置为False set:将”Flag”设置为True wait:检测当前”Flag”,如果”Flag”值为 False,那么当线程执行 event.wait 方法时就会阻塞,如果”Flag”值为True,那么event.wait 方法时便不再阻塞 is_set:检测当前的状态,是否阻塞
import threadingevent = threading.Event() def func (x,event ): print ("函数被执行了: %s 次.." %x) event.wait() print ("加载执行结果: %s" %x) for i in range (10 ): thread = threading.Thread(target=func,args=(i,event,)) thread.start() print ("当前状态: %s" %event.is_set()) event.clear() temp=input ("输入yes: " ) if temp == "yes" : event.set () print ("当前状态: %s" %event.is_set())
定时器(Timer): 指定定时器,作用是让进程或者是指定函数,在n秒后执行相应的操作.
import threadingimport timedef func (): print ("hello python" ) for i in range (5 ): thread = threading.Timer(5 ,func) thread.start()
## Python 与进程
直观地说,进程就是正在执行的程序,进程是多任务操作系统中执行任务的基本单元,是包含了程序指令和相关资源的集合,线程的上一级就是进程,进程可包含很多线程,进程和线程的区别是进程间的数据不共享,多进程也可以用来处理多任务,不过多进程很消耗资源,计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核心数保持一致.
进程与线程的区别,有以下几种解释:
● 新创建一个线程很容易,新创建一个进程需要复制父进程 ● 线程共享创建它的进程的地址空间,进程有自己的地址空间 ● 主线程可以控制相当大的线程在同一进程中,进程只能控制子进程 ● 线程是直接可以访问线程之间的数据,进程需要复制父进程的数据才能访问 ● 主线程变更可能会影响进程的其他线程的行为,父进程的变化不会影响子进程 ● 线程可以直接与其他线程的通信过程,进程必须使用进程间通信和同胞交流过程
◆创建一个进程◆ 通常情况下,创建一个进程需要使用multiprocessing 模块
,具体的创建方法和上面的线程创建方法相同,唯一的不同是关键字的变化,但需要注意的是,由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销,其他使用方法和线程threading.Thread
是一样的,如下介绍两个创建进程例子.
创建进程(1): 通过使用multiprocessing库
,循环创建5个进程,并使用join等待进程执行完毕.
import multiprocessingimport timedef func (name ): time.sleep(2 ) print ("hello" ,name) if __name__ == "__main__" : for i in range (5 ): proc = multiprocessing.Process(target=func,args=("lyshark" ,)) proc.start() proc.join()
创建进程(2): 创建5个进程,并在每个进程里启动1个线程,线程打印出线程的ID号.
import multiprocessingimport threadingimport timedef thread_run (): print ("子线程->子线程ID: %s" %threading.get_ident()) def func (num ): time.sleep(2 ) print ("-------------------------------->>> 主线程->主线程ID %s" %num) for i in range (5 ): thread = threading.Thread(target=thread_run,) thread.start() if __name__ == "__main__" : for i in range (5 ): proc = multiprocessing.Process(target=func,args=(i,)) proc.start()
◆进程数据共享◆ 一般当我们创建两个进程后,进程各自持有一份数据,默认无法共享数据,如果我们想要共享数据必须通过一个中间件来实现数据的交换,来帮你把数据进行一个投递,要实现进程之间的数据共享,其主要有以下几个方法来实现进程间数据的共享,queues,Array,Manager.dict,pipe
这些方法都能实现数据共享,下面将举几个小例子进行说明.
共享队列(Queue): 这个Queue主要实现进程与进程之间的数据共享,与线程中的Queue不同.
from multiprocessing import Processfrom multiprocessing import queuesimport multiprocessing def foo (i,arg ): arg.put(i) print ('say hi' ,i,arg.qsize()) li = queues.Queue(20 ,ctx=multiprocessing) for i in range (10 ): p = Process(target=foo,args=(i,li,)) p.start()
共享整数(int): 整数之间的共享,只需要使用multiprocessing.Value
方法,即可实现.
import multiprocessingdef func (num ): num.value = 1024 print ("函数中的数值: %s" %num.value) if __name__ == "__main__" : num = multiprocessing.Value("d" ,10.0 ) print ("这个共享数值: %s" %num.value) for i in range (5 ): num = multiprocessing.Value("d" , i) proc = multiprocessing.Process(target=func,args=(num,)) proc.start() print ("最后打印数值: %s" %num.value)
共享数组(Array): 数组之间的共享,只需要使用multiprocessing.Array
方法,即可实现.
import multiprocessingdef func (ary ): ary[0 ]=100 ary[1 ]=200 ary[2 ]=300 ''' i所对应的类型是ctypes.c_int,其他类型如下参考: 'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double ''' if __name__ == "__main__" : ary = multiprocessing.Array("i" ,[1 ,2 ,3 ]) for i in range (5 ): proc = multiprocessing.Process(target=func,args=(ary,)) print (ary[:]) proc.start()
共享字典(dict): 通过使用Manager()方法
,实现两个进程中的,字典与列表的数据共享.
import multiprocessingdef func (mydict, mylist ): mydict["字典1" ] = "值1" mydict["字典2" ] = "值2" mylist.append(1 ) mylist.append(2 ) mylist.append(3 ) if __name__ == "__main__" : mydict = multiprocessing.Manager().dict () mylist = multiprocessing.Manager().list () proc = multiprocessing.Process(target=func,args=(mydict,mylist)) proc.start() proc.join() print ("列表中的元素: %s" %mylist) print ("字典中的元素: %s" %mydict)
管道共享(Pipe): 通过Pipe
管道的方式在两个进程之间共享数据,类似于Socket套接字.
import multiprocessingdef func (conn ): conn.send("你好我是子进程." ) print ("父进程传来了:" ,conn.recv()) conn.close() if __name__ == "__main__" : parent_conn,child_conn = multiprocessing.Pipe() proc = multiprocessing.Process(target=func,args=(child_conn,)) proc.start() print ("子进程传来了:" ,parent_conn.recv()) parent_conn.send("我是父进程,收到消息了.." )
进程锁(Lock): 进程中也有锁,可以实现进程之间数据的一致性,也就是进程数据的同步,保证数据不混乱.
import multiprocessingdef func (loc,num ): loc.acquire() print ("hello ---> %s" %num) loc.release() if __name__ == "__main__" : lock = multiprocessing.Lock() for number in range (10 ): proc = multiprocessing.Process(target=func,args=(lock,number,)) proc.start()
◆进程的进程池◆ 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止,进程池有两个方法:apply(),apply_async()
,下面将介绍几个常用的小技巧.
进程池(apply): 同步执行,每次执行一个进程,直到所有进程执行完毕,其实也就是串行执行.
import multiprocessingimport timedef foo (num ): time.sleep(2 ) print ("进程执行-->: %s" %num) if __name__ == "__main__" : pool = multiprocessing.Pool(processes=5 ) for i in range (10 ): pool.apply(func=foo,args=(i,)) print ("ends ..." ) pool.close() pool.join()
进程池(apply_async): 异步执行进程,每次执行5个进程,直到执行完10次循环位置,并行执行.
import multiprocessingimport timedef foo (num ): time.sleep(2 ) print ("进程执行-->: %s" %num) def bar (arg ): print ("call back 函数执行.." ) if __name__ == "__main__" : pool = multiprocessing.Pool(processes=5 ) for i in range (10 ): pool.apply_async(func=foo,args=(i,),callback=bar) print ("ends ..." ) pool.close() pool.join()
## Python 与协程
协程,又称微线程,是一种用户态的轻量级线程,携程主要实现了在单线程下实现并发,一个线程能够被分割成多个协程,协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,因此协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态.
线程和进程的操作是由程序触发系统接口,最后的执行者是系统,协程的操作则是程序员,协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时,而协程则只使用一个线程,在一个线程中规定某个代码块执行顺序,协程的适用场景:当程序中存在大量不需要CPU的操作时(IO操作),时适用于协程.
协程之(Yield): 通过使用yield方法来模拟实现协程操作的例子,这里只是演示.
import timeimport queuedef consumer (name ): print ("--->包子..." ) while True : new_yield = yield print ("[%s] 在吃包子 %s" % (name, new_yield)) def producer (): r = con.__next__() r = con2.__next__() n = 0 while n < 5 : n += 1 con.send(n) con2.send(n) print ("\033[32;1m[producer]\033[0m 生产包子.. %s" % n) if __name__ == '__main__' : con = consumer("admin" ) con2 = consumer("lyshark" ) p = producer()
协程之(Greenlet): Greenlet协程模块,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator,但是仍然需要手动切换.
from greenlet import greenletdef master (): print ("主程序执行..." ) green2.switch() print ("主程序再次执行..." ) green2.switch() def slaves (): print ("子程序执行...." ) green1.switch() print ("子程序再次执行..." ) green1 = greenlet(master) green2 = greenlet(slaves) green1.switch()
协程之(Gevent): Gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,它是以C扩展模块形式接入Python的轻量级协程.
import geventdef func1 (): print ("函数 func1 开始..." ) gevent.sleep(3 ) print ("函数 func1 结束..." ) def func2 (): print ("函数 func2 开始..." ) gevent.sleep(1 ) print ("函数 func2 结束..." ) def func3 (): print ("函数 func3 开始..." ) gevent.sleep(0 ) print ("函数 func3 结束..." ) gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), gevent.spawn(func3), ])
协程实现爬虫: 通过使用Gevent模块,实现批量爬取指定页面并返回页面的大小.
from gevent import monkeymonkey.patch_all() import geventfrom urllib.request import urlopendef func (url ): print ("获取页面: %s" %url) resp = urlopen(url) data = resp.read() print ("%s URL大小为= %d bytes" %(url,len (data))) gevent.joinall([ gevent.spawn(func, 'https://www.python.org/' ), gevent.spawn(func, 'https://www.yahoo.com/' ), gevent.spawn(func, 'https://github.com/' ), ])
并发Socket(服务端): 在单线程下实现多Socket并发,服务端代码如下.
import sysimport socketimport timeimport geventfrom gevent import socket,monkeymonkey.patch_all() def server (port ): s = socket.socket() s.bind(('0.0.0.0' , port)) s.listen(500 ) while True : cli,addr = s.accept() gevent.spawn(handle_request,cli) def handle_request (conn ): try : while True : data = conn.recv(1024 ) print ("接收数据:" , data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print (ex) finally : conn.close() if __name__ == '__main__' : server(8001 )
并发Socket(客户端): 在单线程下实现多Socket并发,客户端代码如下.
import socketHOST = 'localhost' PORT = 8001 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True : msg = bytes (input ("输入发送的数据:" ), encoding="utf8" ) s.sendall(msg) data = s.recv(1024 ) print ('返回数据' , repr (data)) s.close()
## Python 与队列
同步队列 Queue 这是一个专门为多线程访问所设计的数据结构,能够有效地实现线程对资源的访问,程序可以通过此结构在线程间安全有效地传递数据 Queue 模块中包含一个 Queue 的类,其构造函数中可以指定一个Maxsize值,当maxszie值小于或等于0的时候,表示对队列的长度没有限制,当大于0的时候,则指定了队列的长度.当队列到达最大长度而又有新的线程过来的时候,则需要等待 Queue 类中有不少方法,但是最市要的是 put 和 get 方法,Put 方法将需要完成的任务放入队列,而 get 方法相反,从队列中获取任务,需要注意的是,在这些方法中,有些方法由于多线程的原因,返回值并不一定是准确的,例如qsize,empty等函数的统计结果.
先进先出队列: 先来介绍简单的队列例子,以及队列的常用方法的使用,此队列是先进先出模式.
import queueq = queue.Queue(5 ) print (q.empty()) q.put(1 ) q.put(2 ) q.put(3 ,block=False ,timeout=2 ) print (q.full()) print (q.qsize()) print (q.maxsize) print (q.get(block=False ,timeout=2 )) print (q.get()) q.task_done() print (q.get())q.task_done() q.join()
后进先出队列: 这个队列则是,后进先出,也就是最后放入的数据最先弹出,类似于堆栈.
>>> import queue>>> >>> q = queue.LifoQueue()>>> >>> q.put("wang" )>>> q.put("rui" )>>> q.put("ni" )>>> q.put("hao" )>>> >>> print (q.get())hao >>> print (q.get())ni >>> print (q.get())rui >>> print (q.get())wang >>> print (q.get())
优先级队列: 此类队列,可以指定优先级顺序,默认从高到低排列,以此根据优先级弹出数据.
>>> import queue>>> >>> q = queue.PriorityQueue()>>> >>> q.put((1 ,"python1" ))>>> q.put((-1 ,"python2" ))>>> q.put((10 ,"python3" ))>>> q.put((4 ,"python4" ))>>> q.put((98 ,"python5" ))>>> >>> print (q.get())(-1 , 'python2' ) >>> print (q.get())(1 , 'python1' ) >>> print (q.get())(4 , 'python4' ) >>> print (q.get())(10 , 'python3' ) >>> print (q.get())(98 , 'python5' )
双向的队列: 双向队列,也就是说可以分别从两边弹出数据,没有任何限制.
>>> import queue>>> >>> q = queue.deque()>>> >>> q.append(1 )>>> q.append(2 )>>> q.append(3 )>>> q.append(4 )>>> q.append(5 )>>> >>> q.appendleft(6 )>>> >>> print (q.pop())5 >>> print (q.pop())4 >>> print (q.popleft())6 >>> print (q.popleft())1 >>> print (q.popleft())2
生产者消费者模型: 生产者消费者模型,是各种开发场景中最常用的开发模式,以下是模拟的模型.
import queueimport threadingimport timeq = queue.Queue() def productor (arg ): while True : q.put(str (arg)) print ("%s 号窗口有票...." %str (arg)) time.sleep(1 ) def consumer (arg ): while True : print ("第 %s 人取 %s 号窗口票" %(str (arg),q.get())) time.sleep(1 ) for i in range (10 ): t = threading.Thread(target=productor,args=(i,)) t.start() for j in range (5 ): t = threading.Thread(target=consumer,args=(j,)) t1 = threading.Thread(target=consumer,args=(j,)) t.start() t1.start()