使用多处理池的 apply_async 方法时谁运行回调?

Who runs the callback when using apply_async method of a multiprocessing pool?(使用多处理池的 apply_async 方法时谁运行回调?)
本文介绍了使用多处理池的 apply_async 方法时谁运行回调?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在尝试了解使用多处理池的 apply_sync 方法时幕后发生的事情.

I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

谁运行回调方法?是调用apply_async的主进程吗?

Who runs the callback method? Is it the main process that called apply_async?

假设我发送了一大堆带有回调的 apply_async 命令,然后继续执行我的程序.当 apply_async 开始到结束时,我的程序仍在做事.回调如何运行我的主进程"?而主进程仍在忙于脚本?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

这是一个例子.

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

输出类似于

PoolWorker-1 使用 arg 0 运行 func

PoolWorker-1 running func with arg 0

PoolWorker-2 使用 arg 1 运行 func

PoolWorker-2 running func with arg 1

PoolWorker-3 使用 arg 2 运行 func

PoolWorker-3 running func with arg 2

MainProcess 将要休眠一分钟 <-- 主进程正忙

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 使用 arg 3 运行 func

PoolWorker-4 running func with arg 3

PoolWorker-1 使用 arg 4 运行 func

PoolWorker-1 running func with arg 4

PoolWorker-2 使用 arg 5 运行 func

PoolWorker-2 running func with arg 5

PoolWorker-3 使用 arg 6 运行 func

PoolWorker-3 running func with arg 6

PoolWorker-4 使用 arg 7 运行 func

PoolWorker-4 running func with arg 7

MainProcess 运行回调,参数为 0 <-- 主进程在 while 循环中运行回调!!

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess 使用 arg 1 运行回调

MainProcess running callback with arg 1

MainProcess 使用 arg 2 运行回调

MainProcess running callback with arg 2

MainProcess 使用 arg 3 运行回调

MainProcess running callback with arg 3

MainProcess 使用 arg 4 运行回调

MainProcess running callback with arg 4

PoolWorker-1 使用 arg 8 运行 func

PoolWorker-1 running func with arg 8

...

完成脚本

MainProcess 如何在 while 循环中间运行回调??

How is MainProcess running the callback while it's in the middle of that while loop??

multiprocessing.Pool 这似乎是一个提示,但我不明白.

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

apply_async(func[, args[, kwds[, callback]]])

apply_async(func[, args[, kwds[, callback]]])

apply() 方法的变体,它返回一个结果对象.

A variant of the apply() method which returns a result object.

如果指定了回调,那么它应该是一个接受单个参数的可调用对象.当结果准备就绪时,将对其应用回调(除非调用失败).回调应该立即完成,否则处理结果的线程将被阻塞.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

推荐答案

文档里确实有提示:

回调应该立即完成,因为 否则线程处理结果将被阻止.

callback should complete immediately since otherwise the thread which handles the results will get blocked.

回调在主进程中处理,但它们在自己的单独线程中运行.当您创建 Pool 时,它实际上会在内部创建一些 Thread 对象:

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

对我们来说有趣的线程是 _result_handler;我们很快就会知道原因.

The interesting thread for us is _result_handler; we'll get to why shortly.

切换一秒钟,当你运行 apply_async 时,它会在内部创建一个 ApplyResult 对象来管理从孩子那里获取结果:

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

如您所见,假设任务成功,_set 方法最终实际执行传入的 callback 方法.另请注意,它会将自身添加到 __init__ 末尾的全局 cache 字典中.

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

现在,回到 _result_handler 线程对象.该对象调用 _handle_results 函数,如下所示:

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

这是一个循环,它只是从队列中提取子节点的结果,在 cache 中找到它的条目,然后调用 _set,它会执行我们的回调.即使您处于循环中,它也可以运行,因为它没有在主线程中运行.

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

这篇关于使用多处理池的 apply_async 方法时谁运行回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)