问题描述
正如您从标题中知道的那样,我正在尝试将 PriorityQueue 与多处理一起使用.更准确地说,我想制作共享 PriorityQueue,写了一些代码,但它没有按我预期的那样运行.
As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.
看代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
得到以下输出:
worker 100
main 0
发生了什么以及如何以正确的方式做我想做的事?谢谢.
What's happening and how to do what I want the right way? Thank you.
推荐答案
问题不在于它在这种情况下不可picklable - 如果您使用的是类 Unix 平台,则可以将队列传递给子进程而无需酸洗.(不过,在 Windows 上,我认为您会在这里遇到酸洗错误).根本问题是您没有使用进程安全队列.可以在进程之间使用的唯一队列是 Queue
对象 位于 multiprocessing
模块中.不幸的是,没有可用的 PriorityQueue
实现.但是,您可以通过使用 multiprocessing 注册
类,像这样:PriorityQueue
轻松创建一个.Manager
The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue
objects that live inside the multiprocessing
module. Unfortunately, there is no PriorityQueue
implementation available. However, you can easily create one by registering a PriorityQueue
with a multiprocessing.Manager
class, like this:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
pass
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
def Manager():
m = MyManager()
m.start()
return m
def worker(queue):
print(queue)
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue() # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
输出:
worker 100
main 100
请注意,如果它是标准的 multiprocessing.Queue
子类,它的性能可能不会那么好.基于 Manager
的 PriorityQueue
是通过创建一个实际上包含常规 PriorityQueue
的 Manager
服务器进程来实现的,然后为您的主进程和工作进程提供 Proxy
对象,这些对象使用IPC 读取/写入服务器进程中的队列.常规的 multiprocessing.Queue
只是向 Pipe
写入/读取数据.如果这是一个问题,您可以尝试通过从 multiprocessing.Queue
继承或委托来实现自己的 multiprocessing.PriorityQueue
.不过,这可能不值得.
Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue
subclass. The Manager
-based PriorityQueue
is implemented by creating a Manager
server process which actually contains a regular PriorityQueue
, and then providing your main and worker processes with Proxy
objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queue
s just write/read data to/from a Pipe
. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue
by subclassing or delegating from multiprocessing.Queue
. It may not be worth the effort, though.
这篇关于Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!