多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用?

Do multiprocessing pools give every process the same number of tasks, or are they assigned as available?(多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用?)
本文介绍了多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

当您 map 一个可迭代对象到一个 multiprocessing.Pool 时,迭代会在开始时为池中的每个进程分成一个队列,或者是否有一个公共队列当进程空闲时从哪个任务中获取?

When you map an iterable to a multiprocessing.Pool are the iterations divided into a queue for each process in the pool at the start, or is there a common queue from which a task is taken when a process comes free?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,是否每个进程都分配了 25 件事情要做,或者 100 件事情被寻找要做的事情的进程一一挑选出来,这样每个进程可能会做不同数量的事情,例如 30, 26, 24, 20.

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

推荐答案

因此,鉴于此未经测试的建议代码;如果池中有 4 个进程,是否每个进程都分配了 25 件事情要做,或者 100 件事情被寻找要做的事情的进程一一挑选出来,这样每个进程可能会做不同数量的事情,例如 30, 26, 24, 20.

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

嗯,显而易见的答案是测试它.

Well, the obvious answer is to test it.

按原样,测试可能不会告诉您太多信息,因为作业将尽快完成,并且即使池化进程在它们准备好时抢占作业,事情也可能最终均匀分布.但是有一个简单的方法可以解决这个问题:

As-is, the test may not tell you much, because the jobs are going to finish ASAP, and it's possible that things will end up evenly distributed even if pooled processes grab jobs as they become ready. But there's an easy way to fix that:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果数字是参差不齐的",则您知道池中的进程必须在准备就绪时抓取新作业.(我明确地将 chunksize 设置为 1,以确保块不会太大,以至于每个块首先只能获得一个块.)

If the numbers are "jagged", you know either that pooled processes must be grabbing new jobs as ready. (I explicitly set chunksize to 1 to make sure the chunks aren't so big that each only gets one chunk in the first place.)

当我在 8 核机器上运行它时:

When I run it on an 8-core machine:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

因此,看起来流程正在快速获得新的工作.

So, it looks like the processes are getting new jobs on the fly.

由于您特别询问了 4 个工人,我将 Pool() 更改为 Pool(4) 并得到了这个:

Since you specifically asked about 4 workers, I changed Pool() to Pool(4) and got this:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

但是,有一种比测试更好的方法来找出答案:阅读 来源.

However, there's an even better way to find out than by testing: read the source.

如您所见,map 只是调用 map_async,它会创建一堆批处理并将它们放在 self._taskqueue 对象上(Queue.Queue 实例).如果您进一步阅读,此队列不会直接与其他进程共享,但是有一个池管理器线程,每当一个进程完成并返回结果时,它会将下一个作业从队列中弹出并将其提交回进程.

As you can see, map just calls map_async, which creates a bunch of batches and puts them on a self._taskqueue object (a Queue.Queue instance). If you read further, this queue isn't shared with the other processes directly, but there's a pool manager thread that, whenever a process finishes and returns a result, pops the next job off the queue and submits it back to the process.

这也是您可以找出 map 的默认块大小的方法.上面链接的 2.7 实现表明它只是 len(iterable)/(len(self._pool) * 4) 向上取整(比避免小数运算稍微详细一点)——或者,换一种说法,对于每个进程大约 4 个块来说足够大.但是你真的不应该依赖这个;该文档含糊地和间接地暗示它将使用某种启发式方法,但并没有给您任何关于那将是什么的保证.因此,如果您确实需要每个进程大约 4 个块",请明确计算.更实际的是,如果您需要除默认值之外的任何其他值,您可能需要一个特定于域的值来计算(通过计算、猜测或分析).

This is also how you can find out what the default chunksize is for map. The 2.7 implementation linked above shows that it's just len(iterable) / (len(self._pool) * 4) rounded up (slightly more verbose than that to avoid fractional arithmetic)—or, put another way, just big enough for about 4 chunks per process. But you really shouldn't rely on this; the documentation vaguely and indirectly implies that it's going to use some kind of heuristic, but doesn't give you any guarantees as to what that will be. So, if you really need "about 4 chunks per process", calculate it explicitly. More realistically, if you ever need anything besides the default, you probably need a domain-specific value that you're going to work out (by calculation, guessing, or profiling).

这篇关于多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)