Python 中通过 Semaphore 对象可以限制线程的最大数量,从而控制线程的并发访问。Semaphore 是一种同步工具,用于保证多个线程间访问资源的顺序或安全性。
Semaphore在Python的Threading
模块中实现。Semaphore维护了一个内部计数器,初始提供一个数量参数,来限制并发线程访问的数量。当我们希望限制一定数量的线程访问共享资源时,可以创建一个 Semaphore 对象,并将计数器初始化为限额值。每当线程获取资源时,Semaphores的计数器就减少一。当计数器值为 0 时,任何尝试获取资源的线程都将被阻塞。
具体来看,Semaphore 的使用方法如下:
创建一个Semaphore对象
import threading
# 创建一个Semaphore对象,并指定信号量的数量
semaphore = threading.Semaphore(3) # 限制最大并发数量为3
获取锁
获取Semaphore信号量时,可以使用semaphore.acquire()方法,这个方法会判断当前的 Semaphore 对象信号量计数器是否大于0,如果大于0,则立即减少信号量计数器的值并返回True,否则线程会被阻塞。
# 获取 Semaphore,信号量-1
semaphore.acquire()
# ...
# 访问共享资源
# ...
# 释放 Semaphore,信号量+1
semaphore.release()
释放锁
释放Semaphore锁时,需要使用semaphore.release() 方法。Semaphore的计数器会增加1,使得其他线程可以获取信号量访问共享资源。
# 释放 Semaphore,信号量+1
semaphore.release()
示例1
import time
import threading
semaphore = threading.Semaphore(3) # 最大并发数量为3
def worker():
with semaphore:
print(f'{threading.current_thread().name} 获取到信号量')
time.sleep(1) # 模拟执行操作
print(f'{threading.current_thread().name} 释放信号量')
threads = []
for i in range(5):
t = threading.Thread(target=worker, name=f'thread-{i}')
t.start()
threads.append(t)
for t in threads:
t.join()
在上面的代码中,我们创建了一个Semaphore对象,设置最大并发数量为3。创建了5个线程,同时在每个线程中获取 Semaphore 对象,打印线程名和时间戳,然后睡眠1s,接着释放 Semaphore 对象,打印线程名和时间戳。
输出结果:
thread-0 获取到信号量
thread-1 获取到信号量
thread-2 获取到信号量
thread-1 释放信号量
thread-3 获取到信号量
thread-2 释放信号量
thread-0 释放信号量
thread-4 获取到信号量
thread-3 释放信号量
thread-4 释放信号量
示例2
另一个使用Semaphore来限制最大并发的例子是运用在爬虫中。爬虫是我们需要获取公共资源的应用之一。比如我们需要获取一个网站中很多商品的数据,如果我们同时向网站发出大量的请求,就容易被网站屏蔽。这个时候使用Semaphore可以很好的避免这个问题,代码可以这样实现:
import requests
import threading
class Crawler:
SEMAPHORE = threading.Semaphore(3)
def __init__(self, item):
self.item = item
def run(self):
with self.SEMAPHORE:
print(f'开始爬取商品{self.item}')
# 发送请求
response = requests.get(f'https://www.example.com/products?item={self.item}')
# 解析数据
data = self.parse_data(response.text)
# 保存数据
self.save_data_to_database(data)
print(f'商品{self.item}爬取完毕')
def parse_data(self, data):
# 爬虫解析数据的具体逻辑
pass
def save_data_to_database(self, data):
# 爬虫保存数据到数据库的具体逻辑
pass
items = ['item1', 'item2', 'item3', 'item4', 'item5', 'item6']
crawlers = [Crawler(item) for item in items]
threads = [threading.Thread(target=crawler.run, args=()) for crawler in crawlers]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
在这个例子中,我们首先创建了一个Semaphore对象并设置最大并发数量为3。然后我们创建了Crawler 类,该类代表了一个爬虫任务,随后在run方法中实现了具体的爬虫逻辑。在 run方法中首先获取Semaphore对象,获取到信号量时打印开始爬取一个商品的信息,随后发送请求,解析数据,保存数据到数据库。完成以上操作之后,释放Semaphore对象,打印结束爬取一个商品的信息。
我们创建了6个Crawler对象,分别代表了需要爬取的6个商品,最后在多线程的环境中运行爬虫。Semaphore对象可以限制最大并发数量,保证在同一时间内只有3个爬虫工作,不会让网站服务被过于频繁的访问。