问题描述
我正在构建一个 python 模块来从大量文本中提取标签,虽然它的结果质量很高,但它的执行速度非常慢.我试图通过使用多处理来加速这个过程,这也很有效,直到我尝试引入一个锁,以便一次只有一个进程连接到我们的数据库.我一生都无法弄清楚如何完成这项工作-尽管进行了很多搜索和调整,但我仍然收到 PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock 失败
.这是有问题的代码 - 在我尝试将锁定对象作为 f
的参数传递之前它运行良好.
I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f
.
def make_network(initial_tag, max_tags = 2, max_iter = 3):
manager = Manager()
lock = manager.Lock()
pool = manager.Pool(8)
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a list
# of strings (tags) as its sole argument, and returns a list of sets with entries
# corresponding to the input list.
f = partial(get_more_tags, max_tags = max_tags, lock = lock)
def _recursively_find_more_tags(tags, level):
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print i + "|" + joined
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level+1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
推荐答案
你的问题是锁对象不可picklable.在这种情况下,我可以为您看到两种可能的解决方案.
Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.
为避免这种情况,您可以将锁变量设为全局变量.然后,您将能够在池进程函数中直接将其作为全局变量引用,而不必将其作为参数传递给池进程函数.这是因为 Python 在创建池进程时使用
OS fork
机制,因此将创建池进程的进程的全部内容复制到它们.这是将锁传递给使用 multiprocessing 包创建的 Python 进程的唯一方法.顺便说一句,没有必要只为这个锁使用Manager
类.通过此更改,您的代码将如下所示:
To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the
OS fork
mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use theManager
class just for this lock. With this change your code would look like this:
import multiprocessing
from functools import partial
lock = None # Global definition of lock
pool = None # Global definition of pool
def make_network(initial_tag, max_tags=2, max_iter=3):
global lock
global pool
lock = multiprocessing.Lock()
pool = multiprocessing.Pool(8)
def get_more_tags():
global lock
pass
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags)
def _recursively_find_more_tags(tags, level):
global pool
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print(i + "|" + joined)
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level + 1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
在您的真实代码中,锁和池变量可能是类实例变量.
In your real code, it is possible that the lock and pool variables might be class instance variables.
- 完全避免使用锁但开销可能稍高的第二种解决方案是使用
multiprocessing.Process
创建另一个进程并通过multiprocessing.Queue代码> 到您的每个池进程.此过程将负责运行您的数据库查询.您将使用队列来允许池进程将参数发送到管理数据库查询的进程.由于所有池进程将使用相同的队列,因此对数据库的访问将自动序列化.额外的开销将来自数据库查询参数和查询响应的酸洗/解酸.请注意,您可以将
multiprocessing.Queue
对象作为参数传递给池进程.另请注意,基于multiprocessing.Lock
的解决方案不适用于未使用fork
语义创建进程的Windows
.
- A second solution which avoids the use of locks altogether but which might have slightly higher overhead would be to create another process with
multiprocessing.Process
and connect it via amultiprocessing.Queue
to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass amultiprocessing.Queue
object to a pool process as an argument. Note also that themultiprocessing.Lock
based solution would not work onWindows
where process are not created withfork
semantics.
这篇关于使用带有 multiprocessing.Pool 的锁时遇到问题:酸洗错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!