了解多处理:Python 中的共享内存管理、锁和队列

Understanding Multiprocessing: Shared Memory Management, Locks and Queues in Python(了解多处理:Python 中的共享内存管理、锁和队列)
本文介绍了了解多处理:Python 中的共享内存管理、锁和队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

Multiprocessing是python中一个强大的工具,我想更深入地了解它.我想知道什么时候使用 regular Locks 和 队列 以及何时使用多处理 Manager 在所有进程之间共享这些.

Multiprocessing is a powerful tool in python, and I want to understand it more in depth. I want to know when to use regular Locks and Queues and when to use a multiprocessing Manager to share these among all processes.

我想出了以下测试场景,其中包含四种不同的多处理条件:

I came up with the following testing scenarios with four different conditions for multiprocessing:

  1. 使用池和NO经理

使用池和管理器

使用单独的流程和NO经理

使用单独的流程和经理

工作

所有条件都执行一个作业函数the_job.the_job 包含一些由锁保护的打印.此外,函数的输入只是简单地放入一个队列中(看是否可以从队列中恢复).此输入只是在名为 start_scenario 的主脚本中创建的 range(10) 中的索引 idx(显示在底部).

The Job

All conditions execute a job function the_job. the_job consists of some printing which is secured by a lock. Moreover, the input to the function is simply put into a queue (to see if it can be recovered from the queue). This input is simply an index idx from range(10) created in the main script called start_scenario (shown at the bottom).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf
'
    who= ' By run %d 
' % idx
    print who
    lock.release()

    queue.put(idx)

一个条件的成功被定义为完美地召回了输入从队列中,查看底部的函数read_queue.

The success of a condition is defined as perfectly recalling the input from the queue, see the function read_queue at the bottom.

条件 1 和 2 是不言自明的.条件 1 涉及创建锁和队列,并将它们传递给进程池:

Condition 1 and 2 are rather self-explanatory. Condition 1 involves creating a lock and a queue, and passing these to a process pool:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(帮助函数 make_iterator 在这篇文章的底部给出.)条件 1 失败并出现 RuntimeError: Lock objects should only be shared between processes through inheritance.

(The helper function make_iterator is given at the bottom of this post.) Conditions 1 fails with RuntimeError: Lock objects should only be shared between processes through inheritance.

条件 2 很相似,但现在锁和队列都在管理员的监督下:

Condition 2 is rather similar but now the lock and queue are under the supervision of a manager:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

在条件 3 中,手动启动新进程,并且在没有管理器的情况下创建锁和队列:

In condition 3 new processes are started manually, and the lock and queue are created without a manager:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

条件 4 类似,但现在再次使用经理:

Condition 4 is similar but again now using a manager:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

在这两种情况下 - 3 和 4 - 我开始一个新的the_job 的 10 个任务中的每一个的进程,最多有 ncores 个进程同时运行.这是通过以下辅助函数实现的:

In both conditions - 3 and 4 - I start a new process for each of the 10 tasks of the_job with at most ncores processes operating at the very same time. This is achieved with the following helper function:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

结果

只有条件 1 失败(RuntimeError: Lock objects 只能通过继承在进程之间共享),而其他 3 个条件成功.我试图绕过这个结果.

The Outcome

Only condition 1 fails (RuntimeError: Lock objects should only be shared between processes through inheritance) whereas the other 3 conditions are successful. I try to wrap my head around this outcome.

为什么池需要在所有进程之间共享锁和队列,而条件 3 中的各个进程却不需要?

Why does the pool need to share a lock and queue between all processes but the individual processes from condition 3 don't?

我所知道的是,对于池条件(1 和 2),来自迭代器的所有数据都通过酸洗传递,而在单进程条件(3 和 4)中,来自迭代器的所有数据都通过从主进程继承来传递(我正在使用 Linux).我想在从子进程中更改内存之前,会访问父进程使用的相同内存(写时复制).但是一旦有人说lock.acquire(),就应该改变它,并且子进程确实使用放置在内存中其他地方的不同锁,不是吗?一个子进程如何知道兄弟激活了不是通过管理器共享的锁?

What I know is that for the pool conditions (1 and 2) all data from the iterators is passed via pickling, whereas in single process conditions (3 and 4) all data from the iterators is passed by inheritance from the main process (I am using Linux). I guess until the memory is changed from within a child process, the same memory that the parental process uses is accessed (copy-on-write). But as soon as one says lock.acquire(), this should be changed and the child processes do use different locks placed somewhere else in memory, don't they? How does one child process know that a brother has activated a lock that is not shared via a manager?

最后,有点相关的是我的问题 3 和 4 有多少不同.两者都有单独的流程,但它们在管理器的使用上有所不同.两者都被认为是有效代码吗?或者如果实际上不需要经理,是否应该避免使用经理?

Finally, somewhat related is my question how much different conditions 3 and 4 are. Both having individual processes but they differ in the usage of a manager. Are both considered to be valid code? Or should one avoid using a manager if there is actually no need for one?

对于那些只想复制和粘贴所有内容来执行代码的人,这里是完整的脚本:

For those who simply want to copy and paste everything to execute the code, here is the full script:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf
'
    who= ' By run %d 
' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

推荐答案

multiprocessing.Lock 是使用操作系统提供的 Semaphore 对象实现的.在 Linux 上,子进程只是通过 os.fork 从父进程继承信号量的句柄.这不是信号量的副本;它实际上继承了父级具有的相同句柄,可以继承文件描述符的相同方式.另一方面,Windows 不支持 os.fork,因此它必须腌制 Lock.它通过使用 Windows DuplicateHandle API,其中指出:

multiprocessing.Lock is implemented using a Semaphore object provided by the OS. On Linux, the child just inherits a handle to the Semaphore from the parent via os.fork. This isn't a copy of the semaphore; it's actually inheriting the same handle the parent has, the same way file descriptors can be inherited. Windows on the other hand, doesn't support os.fork, so it has to pickle the Lock. It does this by creating a duplicate handle to the Windows Semaphore used internally by the multiprocessing.Lock object, using the Windows DuplicateHandle API, which states:

复制句柄指的是与原始句柄相同的对象.因此,对对象的任何更改都会通过两者来反映把手

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles

DuplicateHandle API 允许您将复制句柄的所有权授予子进程,以便子进程可以在 unpickling 后实际使用它.通过创建孩子拥有的重复句柄,您可以有效地共享"锁定对象.

The DuplicateHandle API allows you to give ownership of the duplicated handle to the child process, so that the child process can actually use it after unpickling it. By creating a duplicated handle owned by the child, you can effectively "share" the lock object.

这是 multiprocessing/synchronize.py

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

注意 __getstate__ 中的 assert_spawning 调用,它在腌制对象时被调用.以下是它的实现方式:

Note the assert_spawning call in __getstate__, which gets called when pickling the object. Here's how that is implemented:

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

该函数通过调用 thread_is_spawning 确保您继承"了 Lock.在 Linux 上,该方法只返回 False:

That function is the one that makes sure you're "inheriting" the Lock, by calling thread_is_spawning. On Linux, that method just returns False:

@staticmethod
def thread_is_spawning():
    return False

这是因为Linux不需要pickle来继承Lock,所以如果__getstate__真的在Linux上被调用,我们一定不是继承.在 Windows 上,还有更多事情要做:

This is because Linux doesn't need to pickle to inherit Lock, so if __getstate__ is actually being called on Linux, we must not be inheriting. On Windows, there's more going on:

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

这里,如果 Popen._tls 对象具有 process_handle 属性,则 thread_is_spawning 返回 True.我们可以看到 process_handle 属性是在 __init__ 中创建的,然后我们想要继承的数据使用 dump 从父级传递给子级,然后属性被删除.所以 thread_is_spawning 只会在 __init__ 期间为 True.根据 this python-ideas mailing list thread,这实际上是添加人为限制以模拟与 Linux 上的 os.fork 相同的行为.Windows实际上可以支持随时传递Lock,因为DuplicateHandle可以随时运行.

Here, thread_is_spawning returns True if the Popen._tls object has a process_handle attribute. We can see that the process_handle attribute gets created in __init__, then the data we want inherited is passed from the parent to child using dump, then the attribute is deleted. So thread_is_spawning will only be True during __init__. According to this python-ideas mailing list thread, this is actually an artificial limitation added to simulate the same behavior as os.fork on Linux. Windows actually could support passing the Lock at any time, because DuplicateHandle can be run at any time.

以上所有内容都适用于 Queue 对象,因为它在内部使用 Lock.

All of the above applies to the Queue object because it uses Lock internally.

我想说继承 Lock 对象比使用 Manager.Lock() 更可取,因为当您使用 Manager.Lock,您对 Lock 的每一次调用都必须通过 IPC 发送到 Manager 进程,这比使用共享 Lock 进程要慢得多code> 存在于调用进程中.不过,这两种方法都完全有效.

I would say that inheriting Lock objects is preferable to using a Manager.Lock(), because when you use a Manager.Lock, every single call you make to the Lock must be sent via IPC to the Manager process, which is going to be much slower than using a shared Lock that lives inside the calling process. Both approaches are perfectly valid, though.

最后,可以使用 initializer 将 Lock 传递给 Pool 的所有成员,而无需使用 Manager/initargs 关键字参数:

Finally, it is possible to pass a Lock to all members of a Pool without using a Manager, using the initializer/initargs keyword arguments:

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

这是有效的,因为传递给 initargs 的参数被传递给在 Pool<中运行的 Process 对象的 __init__ 方法/code>,所以它们最终会被继承,而不是腌制.

This works because arguments passed to initargs get passed to the __init__ method of the Process objects that run inside the Pool, so they end up being inherited, rather than pickled.

这篇关于了解多处理:Python 中的共享内存管理、锁和队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)