threading模块、线程锁、信号量、Event、Queue队列

鳄鱼君

发表文章数:642

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » threading模块、线程锁、信号量、Event、Queue队列

threading模块

Python中开启多线程需要使用threading模块,具体代码参考:

import threading,time
def run(n):
    print('task',n)
    time.sleep(2)

if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=('a1',))  # 一个参数需要加上,不加的话就是两个位置参数了
    t2 = threading.Thread(target=run, args=('a2',))
    t1.start()
    t2.start()
    普通函数调用
    # run('t1')
    # run('t2')

我们创建了两个线程来运行run函数,每执行一次就会睡2s,但是我们采用多线程,让程序看起来是并行的,两个线程同时执行,然后等待2s结束程序。我们也可以像执行普通的函数一样执行以下试试看有什么不同。

继承式调用多线程:

import threading,time
class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()#继承父类的构造方法
        # threading.Thread.__init__(self)
        self.n=n
    def run(self):#函数必须为run
        print('task',self.n)
        time.sleep(2)
        
if __name__ == '__main__':
    t1 = MyThread("a1")
    t2 = MyThread("a2")
    t1.start()
    t2.start()

启动多个线程该:

import threading,time
def run(n):
    print('task',n)
    time.sleep(2)
start_time=time.time()
for i in range(60):
    t=threading.Thread(target=run,args=('a---->%s' % i,))
    t.start()

print('all tasks completed..........')
print('cost:%s'% (time.time() - start_time))

我们启动了60个线程,60个线程执行完毕之后,我们打印了一句”all tasks completed” ,接着打印了运行60个线程所花费的时间。由于是多线程主线程(程序本身)不会等待子线程执行完毕(主线程跟子线程是并行的),所以你现在计算的时间只是主线程运行的时间,而不是run函数里面的2s。

等待子线程的执行结果:

使用join()可以等待子线程执行完毕:

import threading,time
def run(n):
    print('task',n)
    time.sleep(2)
start_time=time.time()
thread_list=[] #定义一个空列表,加入子线程
for i in range(50):
    t=threading.Thread(target=run,args=('a---->%s' % i,))
    t.start()
    thread_list.append(t) #
    #t.join() #这样的话就是2s启动一个线程 也就是串行

for t in thread_list: # 循环线程列表,等待所有的子线程执行完毕
    t.join()
print('all tasks completed..........')
print('cost:%s'% (time.time() - start_time))

程序本身是主线程,怎么证明呢?

import threading,time
def run(n):
    print('task',n,threading.current_thread(),threading.active_count())
    time.sleep(2)

start_time=time.time()
thread_list=[]
for i in range(60):
    t = threading.Thread(target=run, args=('a---->%s' % i,))  # 一个参数需要加上,不加的话就是两个位置参数了
    t.start()
    thread_list.append(t)
for t in thread_list:
    t.join()

print('all tasks completed.....',threading.current_thread(),threading.active_count())
print('cost:%s' % (time.time() - start_time))
  1. threading.current_thread():当前进程的类型
  2. threading.active_count():当前活跃的进程数量

守护线程:当主线程退出时,其它子线程会同时退出,不管是否执行完任务。

import threading,time
def run(n):
    print('task',n)
    time.sleep(2)
    print('task done',n)

start_time=time.time()
thread_list=[]
for i in range(60):
    t = threading.Thread(target=run, args=('a---->%s' % i,))  # 一个参数需要加上,不加的话就是两个位置参数了
    t.setDaemon(True) # 把当前线程设置为守护线程

    t.start()
    thread_list.append(t)

print('all tasks completed.....')
print('cost:%s' % (time.time() - start_time))

将60个线程设置为守护线程,守护这主线程,主线程执行完毕,所有的子线程都会退出,所以你不会看到子线程打印task done。


子线程之间的数据可以交换,那么我们可以通过下面的代码来测试一下:

import threading,time
num=0 # 定义一个全局变量
def run():
    global num # 修改num的值
    num+=1 #每个线程执行就会加1
    time.sleep(2)
thread_list=[]
for i in range(100):
    t=threading.Thread(target=run,args=())
    t.start()
for t in thread_list: # 等待所有的子线程执行完毕
    t.join()
print(num)

代码启动了100个子线程,对num进行加1操作, 由于是多线程是并发同时运行的,所以这100个线程很有可能同时拿走了num=0这个初始变量交给cpu去运算,当A线程处理完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99,这是我们分析的运行方式,但实际上代码返回的num值为100。总之你只需要知道,多个线程修改公共数据都需要设置用户锁就完事了!

每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

import threading,time
lock=threading.Lock()#生成一把锁的实例
num=0
def run():
    lock.acquire()
    global num
    num+=1
    lock.release()
    time.sleep(2)
    
thread_list=[]
for i in range(100):
    t=threading.Thread(target=run,args=())
    t.start()

for t in thread_list:
    t.join()
print(num)

继承式加锁:

import threading,time
class myThread(threading.Thread):
    def __init__(self,threadID,name,counter):
        threading.Thread.__init__(self)
        self.threadID=threadID
        self.name=name
        self.counter=counter
    def run(self):
        print('starting'+self.name)
        #获得锁,成功获得锁定后返回True
        #可选的timeout参数不填时将一直阻塞直到获得锁定
        #否则超时后将反会False
        threadLock.acquire()
        print_time(self.name,self.counter,3)
        threadLock.release()
def print_time(threadName,delay,counter):
    while counter:
        time.sleep(delay)
        print('%s:%s'%(threadName,time.ctime(time.time())))
        counter-=1

threadLock=threading.Lock()
threads=[]
#创建新线程
thread1=myThread(1,'Thread-1',1)
thread2=myThread(2,'Thread-2',2)
#开启新线程
thread1.start()
thread2.start()
threads.append(thread1)
threads.append(thread2)
#等待所有线程完成
for i in threads:
    i.join()
print('all thread is ')

Rlock递归锁

上面讲过我们可以给线程加上一把锁,那么也可以加上多把锁,锁太多那么程序也会搞混的:

import threading,time
num1,num2=0,0
lock=threading.Lock()
#lock=threading.RLock() #解决递归锁
def run1():
    print('this is the first lock')
    lock.acquire()
    global num1
    num1+=1
    lock.release()
    return num1
def run2():
    print('this is the second lock')
    lock.acquire()
    global num2
    num2+=1
    lock.release()
    return  num2
def run3():  #相当于大门的总锁
    lock.acquire() #生成一个锁
    fun1=run1() #执行run1,里面有小锁
    print('-----between run1 and run2------')
    fun2=run2()#执行run2 ,里面有小锁
    lock.release() #释放一个锁
    print(fun1,fun2)
for i in range(20):
    t=threading.Thread(target=run3)
    t.start()
while threading.active_count() !=1: #判断当前线程的数量不等于1
    print(threading.active_count())
else:
    print('all threads done')
    print(num1,num2)

运行上面的程序会卡死,一直运行,出不来的呢种感觉,方法已经注释过了,自己尝试一下。

信号量Semaphore

互斥锁允许同一时间有一个线程在更改数据,但是Semaphore可以允许同时有一定数量的线程更改数据。

import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print(' the thread %s is running....\n'% n)
    semaphore.release()
if __name__=='__main__':
    semaphore=threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    for i in range(20):
        t=threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() !=1:
    pass
else:
    print('all threads is done........')

执行返回的结果就是5个5个执行,但是顺序不一样,这就是多线程的特点,它不会等5个都执行完在生成5个,而是5个有一个执行完成就在加入一个。可以用到线程池,支持多并发。

Events

事件是一个简单的同步对象;事件表示内部标志和线程可以等待旗子被设置,或者自己设置或清除旗子。我这说的是啥呐,我自己也不知道,看下面代码

主要方法:

  • event.wait()客户端线程可以等待设置标志
  • event.set()服务器线程可以设置标志
  • event.clear()服务器线程可以重置标志
  • event.is_set()判断是否设置标志

如果设置了该标志,那么wait方法将不执行任何操作。如果标志被清除,wait将被阻塞,直到它再次被设置。任意数量的线程可以等待同一个事件。

import threading,time
event=threading.Event()
def lamp():
    count=0
    event.set()#设置标志,假设设置为绿灯
    while True: #不断的循环
        if count>5 and count<10: #判断count在5和10之间,清除标志
            event.clear() #重置标志位
            print('\033[41;1mred lamp is light...\033[0m')#红灯
        elif count>10:#count超过10的话,count清零,设置标志
            event.set() #设置标志,假设变为绿灯
            count=0
        else:print('\033[42;1m green lamp is light..\033[0m')#count小于5是绿灯状态
        time.sleep(1)
        count+=1
def car(name): #定义一个车,启动多线程,按照标志物的不同来运行不同的结果
    while True:
        if event.is_set(): #代表绿灯
            print('[%s] is running ...' % name)
            time.sleep(1)
        else:
            print('[%s] is red lamp,  wait ...'% name)
            event.wait()
            print('\033[42;1m green lamp is light, running.....\033[0m')
lamp_thread=threading.Thread(target=lamp,)
lamp_thread.start()
car_thread=threading.Thread(target=car,args=('baoma',))
car_thread.start()

跟我们开车遇到红绿灯一样,红灯等待,绿灯运行,红绿灯就是标志物,我们就可以使用event的相关方法来设置。代码中两段是设置字体颜色的,呢不红绿灯吗,为了更加的突出标志物。

Queue队列

当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。队列可以使程序解耦,提高运行效率。

class queue.Queue(maxsize=0) #先入先出

class queue.LifoQueue(maxsize=0) #最后进的先出
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列,按照设置的顺序

优先队列的构造函数。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被使用。如果maxsize小于或等于0,则队列大小为无穷大。

queue.Queue

import queue
q=queue.Queue()
q.put("数据1") #放入数据1,2,3
q.put("数据2")
q.put("数据3")
print(q.qsize()) #打印放入数据的大小
print(q.get()) #按照放入的顺序,先进先出
print(q.get()) #按照放入的顺序,先进先出
print(q.get()) #按照放入的顺序,先进先出
#print(q.get(timeout=1)) #同样你也可以设置超时时间,超过时间就会抛出异常
print(q.get_nowait()) #如果超出放入的数量,就会卡住,处于一直等待状态,可以使用q.get_nowait()终断抛出异常
  1. Queue.qsize():返回队列的大致大小
  2. Queue.empty():如果为空则返回True
  3. Queue.full():如果已满,返回True
  4. Queue.put(item, block=True, timeout=None):将数据itme放入队列
  5. Queue.get(block=True, timeout=None):从队列中删除一个item,然后取出。 阻塞/当取不出来。设置时间限制。
  6. Queue.put_nowait(item) 等同于 put(item, False).

将项目放入队列。如果可选的args块为true, timeout为None(缺省值),则在空闲槽可用之前,必要时进行块处理。如果timeout是一个正数,那么它将阻塞最多的超时秒数,如果在这段时间内没有可用的空闲插槽,则引发完整的异常。否则(block为false),如果空闲插槽立即可用,则将一个项放到队列中,否则将引发完全异常(在这种情况下忽略timeout)。

queue.LifoQueue

import queue
q=queue.LifoQueue()
q.put('数据1')
q.put('数据2')
q.put('数据3')
print(q.get())
print(q.get())
print(q.get())#返回结果就是最后放进去的数据先取出来
#print(q.get(False)) #没有数据就会一直处于阻塞状态
#print(q.get(timeout=1))

queue.PriorityQueue

import queue
q=queue.PriorityQueue()
q.put((10,'数据1'))
q.put((-10,'数据2'))
q.put((20,'数据3'))
print(q.get())
print(q.get())
print(q.get())

我们可以启动一个线程,向队列添加任务,再启动一个线程从队列取任务。呢我使用列表不是也可以完成,启动一个线程向列表仍数据,另一个线程再取不就完了。区别:队列取到数据,原队列就不存在;列表只能循环取,原列表还存在。

生产者消费者模型:

import threading
import queue

def producer():
    for i in range(10):
        q.put("任务 %s" % i)
        
    q.join()
    print("所有任务已完成...")

def consumer(n):
    while q.qsize() > 0:
        print(" %s 收到" % n, q.get())
        q.task_done()  # 告知这个任务执行完了

q = queue.Queue()

p = threading.Thread(target=producer, )
p.start()

c1 = consumer("线程1")

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《threading模块、线程锁、信号量、Event、Queue队列》 发布于2020-01-07

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

7 + 7 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册