线程同步是指在多线程并发执行的场景中,保证各个线程协作正确可靠的一种机制。在Python中,我们通常使用锁(Lock)和条件变量(Condition)两种机制进行线程同步。
一、锁(Lock)
1.1 什么是锁
锁是一种线程同步机制,主要用于协调多个线程的并发访问,实现线程之间的互斥。
1.2 锁的实现机制
Python中的锁是通过Lock对象实现的,Lock的状态只有两种:锁定和未锁定。每当一个线程将一个锁对象锁定后,其他线程就无法再次获取该锁,直到该锁被锁定的线程释放锁。
1.3 使用锁
在Python中,使用锁可以非常简单的实现线程同步。示例代码如下:
import threading
# 创建锁
lock = threading.Lock()
def work():
# 获取锁
lock.acquire()
try:
for i in range(5):
print("{} is running...".format(threading.current_thread().name))
finally:
# 释放锁
lock.release()
if __name__ == '__main__':
# 创建两个线程
t1 = threading.Thread(target=work)
t2 = threading.Thread(target=work)
# 启动线程
t1.start()
t2.start()
在上述示例中,我们创建了两个线程,它们的工作函数是打印5次当前线程的名字。由于这两个线程都需要输出,因此需要使用锁机制进行线程同步。
首先,我们使用threading.Lock()
创建一个锁对象。然后在工作函数中,使用lock.acquire()
获取锁,表示锁被占用。在获取锁后,线程可以执行自己的任务。
最后,使用lock.release()
释放锁,表示任务执行完毕,锁被释放。只有当锁被释放后,其他线程才能获取锁,执行自己的任务。
二、条件变量(Condition)
2.1 什么是条件变量
条件变量是一个线程同步机制,它的主要作用是在多个线程之间协调共享数据的访问。
2.2 条件变量的实现机制
在Python中,条件变量是通过Condition对象实现的。Condition对象内部有一个锁对象,它通过wait()
、notify()
和notify_all()
三个方法协调线程之间的协作。
wait(): 等待条件变量。当条件变量被唤醒时,会重新获取锁,并返回True。
notify(n=1): 唤醒指定数量的线程,使得它们从等待队列中出队,但不会立即释放锁。
notify_all(): 唤醒所有等待的线程。
2.3 使用条件变量
示例1:
import threading
class Producer(threading.Thread):
def __init__(self, cond, name):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
print("生产者已准备好。")
self.cond.notify()
class Consumer(threading.Thread):
def __init__(self, cond, name):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
print("等待生产者准备中...")
self.cond.wait()
print("消费者已开始消费。")
if __name__ == "__main__":
cond = threading.Condition()
p = Producer(cond, "Producer")
c = Consumer(cond, "Consumer")
p.start()
c.start()
p.join()
c.join()
在上面的示例中,我们使用条件变量协调了两个线程的协作。在Producer
线程中,我们通过notify()
方法唤醒了Consumer
线程,并允许其执行任务。在Consumer
线程中,我们通过wait()
方法等待生产者准备就绪,并等待被唤醒通知。
示例2:
import threading
MAX_NUM = 10
class Producer(threading.Thread):
def __init__(self, cond, name):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
for i in range(MAX_NUM):
# 生产
print("{} 生产了 {}".format(self.name, i))
# 控制队列长度
if i == MAX_NUM - 1:
self.cond.notify_all()
self.cond.wait()
else:
self.cond.notify()
class Consumer(threading.Thread):
def __init__(self, cond, name):
super().__init__(name=name)
self.cond = cond
def run(self):
with self.cond:
for i in range(MAX_NUM):
self.cond.wait()
# 消费
print("{} 消费了 {}".format(self.name, i))
self.cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
p = Producer(cond, "Producer")
c1 = Consumer(cond, "Consumer1")
c2 = Consumer(cond, "Consumer2")
p.start()
c1.start()
c2.start()
p.join()
c1.join()
c2.join()
在上面的示例中,我们使用条件变量模拟了一个进程间通信的生产者-消费者模型。在Producer
线程中,我们每次生成一个数据并使用notify()
方法通知一个消费者。在最后一次生成一个数据时,我们使用notify_all()
方法,将所有等待中的消费者都唤醒并等待生产者再次notify()
。在Consumer
线程中,我们首先使用wait()
方法等待生产者的通知,随后打印信息并使用notify()
方法通知生产者继续往队列中生产。