给大家整理了相关的编程文章,网友巴高昂根据主题投稿了本篇教程内容,涉及到python、线程锁、python线程锁相关知识点相关内容,已被999网友关注,内容中涉及的知识点可以在下方直接下载获取。
python线程锁相关知识点
Python线程下使用锁的技巧分享
使用诸如Lock、RLock、Semphore之类的锁原语时,必须多加小心,锁的错误使用很容易导致死锁或相互竞争。依赖锁的代码应该保证当出现异常时可以正常的释放锁。
典型代码如下:
try: lock.acquire() #关键部分 ... finally: lock.release()
另外,所有种类的锁还支持上下文管理协议(写起来更简洁):
with语句自动获取锁,并且在控制流离开上下文时自动释放锁。
with lock: #关键部分 ...
此外,编写代码时一般应该避免同时获取多个锁,例如下面就应该尽量避免:
这通知很统一导致应用程序神秘死锁,尽管与集中策略可以避免出现这种情况(如分层锁定),但是最好在编写代码时避免这种嵌套锁。
with lock_A: #关键部分 ... with lock_B: #B的关键部分 ...
尽管在Python中可以使用各种锁和同步原语的组合编写非常传统的多线程程序,但有一种首推的编程方式要优于其他所有编程方式:即将多线程程序组织为多个独立任务的集合,这些任务之间通过消息队列进行通信,例如下面要讲的queue模块。
Python多线程编程之多线程加锁操作示例
Python语言本身是支持多线程的,不像PHP语言。
下面的例子是多个线程做同一批任务,任务总是有task_num个,每次线程做一个任务(print),做完后继续取任务,直到所有任务完成为止。
# -*- coding:utf-8 -*- #! python2 import threading start_task = 0 task_num = 10000 mu = threading.Lock() ###通过工厂方法获取一个新的锁对象 class MyThread(threading.Thread): ###类MyThread继承基类threading.Thread def run(self): ##线程启动的入口函数,子类需重写 global start_task global mu global start_task while start_task < task_num: ##如果任务没有完成,则继续 if mu.acquire(): ##加锁 if start_task < task_num: print start_task start_task = start_task + 1 mu.release() ##释放锁 def test(): thread_all = [] for i in range(6): ##for循环创建6个线程 t = MyThread() ##创建线程 thread_all.append(t) t.start() ###启动线程 for i in range(6): thread_all[i].join() ##等待线程结束 if __name__ == "__main__": test()
运行上述代码,则输出1~9999
测试加锁与不加锁效果:将任务数设置为1千万或者以上,在多核机器上将print输出分别保存,就能说明问题。
python线程中同步锁详解
在使用多线程的应用下,如何保证线程安全,以及线程之间的同步,或者访问共享变量等问题是十分棘手的问题,也是使用多线程下面临的问题,如果处理不好,会带来较严重的后果,使用python多线程中提供Lock Rlock Semaphore Event Condition 用来保证线程之间的同步,后者保证访问共享变量的互斥问题
Lock & RLock:互斥锁 用来保证多线程访问共享变量的问题
Semaphore对象:Lock互斥锁的加强版,可以被多个线程同时拥有,而Lock只能被某一个线程同时拥有。
Event对象: 它是线程间通信的方式,相当于信号,一个线程可以给另外一个线程发送信号后让其执行操作。
Condition对象:其可以在某些事件触发或者达到特定的条件后才处理数据
1、Lock(互斥锁)
请求锁定 — 进入锁定池等待 — 获取锁 — 已锁定 — 释放锁
Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。
构造方法:
Lock()
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
if mutex.acquire(): counter += 1 print "I am %s, set counter:%s" % (self.name, counter) mutex.release()
2、RLock(可重入锁)
RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。
可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。
构造方法:
RLock()
实例方法:
acquire([timeout])/release(): 跟Lock差不多。
3、Semaphore(共享对象访问)
咱们再聊聊Semaphore ,说实话Semaphore是我最晚使用的同步锁,以前类似的实现,是我用Rlock实现的,相对来说有些绕,毕竟Rlock 是需要成对的锁定和开锁的》。。。
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
直接上代码,我们把semaphore控制为3,也就是说,同时有3个线程可以用这个锁,剩下的线程也之只能是阻塞等待了…
#coding:utf-8 #blog xiaorui.cc import time import threading semaphore = threading.Semaphore(3) def func(): if semaphore.acquire(): for i in range(3): time.sleep(1) print (threading.currentThread().getName() + '获取锁') semaphore.release() print (threading.currentThread().getName() + ' 释放锁') for i in range(5): t1 = threading.Thread(target=func) t1.start()
4、Event(线程间通信)
Event内部包含了一个标志位,初始的时候为false。
可以使用使用set()来将其设置为true;
或者使用clear()将其从新设置为false;
可以使用is_set()来检查标志位的状态;
另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。
import threading import time class MyThread(threading.Thread): def __init__(self, signal): threading.Thread.__init__(self) self.singal = signal def run(self): print "I am %s,I will sleep ..."%self.name self.singal.wait() print "I am %s, I awake..." %self.name if __name__ == "__main__": singal = threading.Event() for t in range(0, 3): thread = MyThread(singal) thread.start() print "main thread sleep 3 seconds... " time.sleep(3) singal.set()
5、Condition(线程同步)
可以把Condition理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。Condition还提供了如下方法(特别要注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):
Condition.wait([timeout]):
wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。
Condition.notify():
唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。
Condition.notify_all()
Condition.notifyAll()
唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。
对于Condition有个例子,大家可以观摩下。
from threading import Thread, Condition import time import random queue = [] MAX_NUM = 10 condition = Condition() class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() if len(queue) == MAX_NUM: print "Queue full, producer is waiting" condition.wait() print "Space in queue, Consumer notified the producer" num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.notify() condition.release() time.sleep(random.random()) ProducerThread().start() ConsumerThread().start()