用于并行进程的 Python 多处理

Python multiprocessing for parallel processes(用于并行进程的 Python 多处理)
本文介绍了用于并行进程的 Python 多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

如果这对某些人来说太简单了,我很抱歉,但我仍然不明白 python 的多处理的技巧.我读过
http://docs.python.org/dev/library/multiprocessing
http://pymotw.com/2/multiprocessing/basics.html以及谷歌提供给我的许多其他教程和示例……其中许多也来自这里.

I'm sorry if this is too simple for some people, but I still don't get the trick with python's multiprocessing. I've read
http://docs.python.org/dev/library/multiprocessing
http://pymotw.com/2/multiprocessing/basics.html and many other tutorials and examples that google gives me... many of them from here too.

嗯,我的情况是我必须计算许多 numpy 矩阵,然后我需要将它们存储在单个 numpy 矩阵中.假设我想使用 20 个内核(或者我可以使用 20 个内核),但我没有成功使用池资源,因为它使进程保持活动状态直到池死亡".所以我想做这样的事情:

Well, my situation is that I have to compute many numpy matrices and I need to store them in a single numpy matrix afterwards. Let's say I want to use 20 cores (or that I can use 20 cores) but I haven't managed to successfully use the pool resource since it keeps the processes alive till the pool "dies". So I thought on doing something like this:

from multiprocessing import Process, Queue  
import numpy as np  

def f(q,i):  
     q.put( np.zeros( (4,4) ) ) 

if __name__ == '__main__':   
     q = Queue()   
     for i in range(30):   
          p = Process(target=f, args=(q,))  
          p.start()  
          p.join()  
     result = q.get()  
     while q.empty() == False:
          result += q.get()  
     print result

但是看起来这些进程不是并行运行的,而是按顺序运行的(如果我错了,请纠正我)而且我不知道它们在计算后是否会死掉(所以超过 20处理那些完成了他们的工作的那些,让核心空闲给另一个进程).另外,对于一个非常大的数字(比如 100.000),将所有这些矩阵(可能也很大)存储在一个队列中会占用大量内存,从而使代码变得无用,因为我们的想法是将每个结果都放在每次迭代中在最终结果中,就像使用了一个锁(以及它的 acquire() 和 release() 方法),但是如果这段代码不是用于并行处理,那么锁也是无用的......

but then it looks like the processes don't run in parallel but they run sequentially (please correct me if I'm wrong) and I don't know if they die after they do their computation (so for more than 20 processes the ones that did their part leave the core free for another process). Plus, for a very large number (let's say 100.000), storing all those matrices (which may be really big too) in a queue will use a lot of memory, rendering the code useless since the idea is to put every result on each iteration in the final result, like using a lock (and its acquire() and release() methods), but if this code isn't for parallel processing, the lock is useless too...

我希望有人可以帮助我.

I hope somebody may help me.

提前致谢!

推荐答案

你是对的,在你的例子中它们是按顺序执行的.

You are correct, they are executing sequentially in your example.

p.join() 导致当前线程阻塞,直到它完成执行.您要么希望在 for 循环之外单独加入进程(例如,通过将它们存储在列表中然后对其进行迭代),要么使用类似 numpy.Poolapply_async 带有回调.这也可以让您直接将其添加到结果中,而不是保留对象.

p.join() causes the current thread to block until it is finished executing. You'll either want to join your processes individually outside of your for loop (e.g., by storing them in a list and then iterating over it) or use something like numpy.Pool and apply_async with a callback. That will also let you add it to your results directly rather than keeping the objects around.

例如:

def f(i):  
    return i*np.identity(4)

if __name__ == '__main__':
    p=Pool(5)
    result = np.zeros((4,4))
    def adder(value):
        global result
        result += value

    for i in range(30):
        p.apply_async(f, args=(i,), callback=adder)
    p.close()
    p.join()
    print result

关闭并在最后加入池可确保池的进程已完成并且 result 对象已完成计算.您还可以使用 Pool.imap 作为解决问题的方法进行调查.该特定解决方案看起来像这样:

Closing and then joining the pool at the end ensures that the pool's processes have completed and the result object is finished being computed. You could also investigate using Pool.imap as a solution to your problem. That particular solution would look something like this:

if __name__ == '__main__':
    p=Pool(5)
    result = np.zeros((4,4))

    im = p.imap_unordered(f, range(30), chunksize=5)

    for x in im:
        result += x

    print result

这更适合您的具体情况,但可能不适用于您最终想要做的任何事情.

This is cleaner for your specific situation, but may not be for whatever you are ultimately trying to do.

至于存储所有不同的结果,如果我理解您的问题,您可以将其添加到回调方法(如上)或使用 imap/imap_unordered (它仍然存储结果,但您会在构建时将其清除).然后它不需要存储比添加到结果中更长的时间.

As to storing all of your varied results, if I understand your question, you can just add it off into a result in the callback method (as above) or item-at-a-time using imap/imap_unordered (which still stores the results, but you'll clear it as it builds). Then it doesn't need to be stored for longer than it takes to add to the result.

这篇关于用于并行进程的 Python 多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)