使用python多处理动态创建共享数组列表

Dynamically create a list of shared arrays using python multiprocessing(使用python多处理动态创建共享数组列表)
本文介绍了使用python多处理动态创建共享数组列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我想使用 python 的多处理模块在不同的子进程之间共享几个 numpy 数组.我希望这些数组可以单独锁定,并且我希望在运行时动态确定数组的数量.这可能吗?

I'd like to share several numpy arrays between different child processes with python's multiprocessing module. I'd like the arrays to be separately lockable, and I'd like the number of arrays to be dynamically determined at runtime. Is this possible?

在this answer中,J.F. Sebastian 提出了一种在多处理时在共享内存中使用 python 的 numpy 数组的好方法.该阵列是可锁定的,这就是我想要的.我想做一些非常相似的事情,除了可变数量的共享数组.数组的数量将在运行时确定.他的示例代码非常清晰,几乎完全符合我的要求,但我不清楚如何声明可变数量的此类数组而不给每个数组一个硬编码名称,如 shared_arr_1, <代码>shared_arr_2 等.这样做的正确方法是什么?

In this answer, J.F. Sebastian lays out a nice way to use python's numpy arrays in shared memory while multiprocessing. The array is lockable, which is what I want. I would like to do something very similar, except with a variable number of shared arrays. The number of arrays would be determined at runtime. His example code is very clear and does almost exactly what I want, but I'm unclear how to declare a variable number of such arrays without giving each one of them a hard-coded name like shared_arr_1, shared_arr_2, et cetera. What's the right way to do this?

推荐答案

原来这比我想象的要容易!在 J.F. Sebastian 的鼓励下,这是我的答案:

Turns out this was easier than I thought! Following J.F. Sebastian's encouragement, here's my crack at an answer:

import time
import ctypes
import logging
import Queue
import multiprocessing as mp
import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    data_pipeline = Image_Data_Pipeline(
        num_data_buffers=5,
        buffer_shape=(60, 256, 512))
    start = time.clock()
    data_pipeline.load_buffers(data_pipeline.num_data_buffers)
    end = time.clock()
    data_pipeline.close()
    print "Elapsed time:", end-start


class Image_Data_Pipeline:
    def __init__(self, num_data_buffers, buffer_shape):
        """
        Allocate a bunch of 16-bit buffers for image data
        """
        self.num_data_buffers = num_data_buffers
        self.buffer_shape = buffer_shape
        pix_per_buf = np.prod(buffer_shape)
        self.data_buffers = [mp.Array(ctypes.c_uint16, pix_per_buf)
                             for b in range(num_data_buffers)]
        self.idle_data_buffers = range(num_data_buffers)

        """
        Launch the child processes that make up the pipeline
        """
        self.camera = Data_Pipeline_Process(
            target=child_process, name='Camera',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape)
        self.display_prep = Data_Pipeline_Process(
            target=child_process, name='Display Prep',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.camera.output_queue)
        self.file_saving = Data_Pipeline_Process(
            target=child_process, name='File Saving',
            data_buffers=self.data_buffers, buffer_shape=buffer_shape,
            input_queue=self.display_prep.output_queue)
        return None

    def load_buffers(self, N, timeout=0):
        """
        Feed the pipe!
        """
        for i in range(N):
            self.camera.input_queue.put(self.idle_data_buffers.pop())

        """
        Wait for the buffers to idle. Here would be a fine place to
        feed them back to the pipeline, too.
        """
        while True:
            try:
                self.idle_data_buffers.append(
                    self.file_saving.output_queue.get_nowait())
                info("Buffer %i idle"%(self.idle_data_buffers[-1]))
            except Queue.Empty:
                time.sleep(0.01)
            if len(self.idle_data_buffers) >= self.num_data_buffers:
                break
        return None

    def close(self):
        self.camera.input_queue.put(None)
        self.display_prep.input_queue.put(None)
        self.file_saving.input_queue.put(None)
        self.camera.child.join()
        self.display_prep.child.join()
        self.file_saving.child.join()


class Data_Pipeline_Process:
    def __init__(
        self,
        target,
        name,
        data_buffers,
        buffer_shape,
        input_queue=None,
        output_queue=None,
        ):
        if input_queue is None:
            self.input_queue = mp.Queue()
        else:
            self.input_queue = input_queue

        if output_queue is None:
            self.output_queue = mp.Queue()
        else:
            self.output_queue = output_queue

        self.command_pipe = mp.Pipe() #For later, we'll send instrument commands

        self.child = mp.Process(
            target=target,
            args=(name, data_buffers, buffer_shape,
                  self.input_queue, self.output_queue, self.command_pipe),
            name=name)
        self.child.start()
        return None

def child_process(
    name,
    data_buffers,
    buffer_shape,
    input_queue,
    output_queue,
    command_pipe):
    if name == 'Display Prep':
        display_buffer = np.empty(buffer_shape, dtype=np.uint16)
    while True:
        try:
            process_me = input_queue.get_nowait()
        except Queue.Empty:
            time.sleep(0.01)
            continue
        if process_me is None:
            break #We're done
        else:
            info("start buffer %i"%(process_me))
            with data_buffers[process_me].get_lock():
                a = np.frombuffer(data_buffers[process_me].get_obj(),
                                  dtype=np.uint16)
                if name == 'Camera':
                    """
                    Fill the buffer with data (eventually, from the
                    camera, dummy data for now)
                    """
                    a.fill(1)
                elif name == 'Display Prep':
                    """
                    Process the 16-bit image into a display-ready
                    8-bit image. Fow now, just copy the data to a
                    similar buffer.
                    """
                    display_buffer[:] = a.reshape(buffer_shape)
                elif name == 'File Saving':
                    """
                    Save the data to disk.
                    """
                    a.tofile('out.raw')
            info("end buffer %i"%(process_me))
            output_queue.put(process_me)
    return None

if __name__ == '__main__':
    main()

背景:这是数据采集管道的骨架.我想以非常高的速率获取数据,对其进行处理以在屏幕上显示,然后将其保存到磁盘.我不希望显示速率或磁盘速率限制获取,这就是为什么我认为在单独的处理循环中使用单独的子进程是合适的.

Background: This is the skeleton of a data-acquisition pipeline. I want to acquire data at a very high rate, process it for on-screen display, and save it to disk. I don't ever want display rate or disk rate to limit acquisition, which is why I think using separate child processes in individual processing loops is appropriate.

这是虚拟程序的典型输出:

Here's typical output of the dummy program:

C:codeinstrument_control>c:Python27python.exe test.py
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[INFO/MainProcess] allocating a new mmap of length 15728640
[[INFO/Camera] child process calling self.run()
INFO/Display Prep] child process calling self.run()
[INFO/Camera] start buffer 4
[INFO/File Saving] child process calling self.run()
[INFO/Camera] end buffer 4
[INFO/Camera] start buffer 3
[INFO/Camera] end buffer 3
[INFO/Camera] start buffer 2
[INFO/Display Prep] start buffer 4
[INFO/Camera] end buffer 2
[INFO/Camera] start buffer 1
[INFO/Camera] end buffer 1
[INFO/Camera] start buffer 0
[INFO/Camera] end buffer 0
[INFO/Display Prep] end buffer 4
[INFO/Display Prep] start buffer 3
[INFO/File Saving] start buffer 4
[INFO/Display Prep] end buffer 3
[INFO/Display Prep] start buffer 2
[INFO/File Saving] end buffer 4
[INFO/File Saving] start buffer 3
[INFO/MainProcess] Buffer 4 idle
[INFO/Display Prep] end buffer 2
[INFO/Display Prep] start buffer 1
[INFO/File Saving] end buffer 3
[INFO/File Saving] start buffer 2
[INFO/MainProcess] Buffer 3 idle
[INFO/Display Prep] end buffer 1
[INFO/Display Prep] start buffer 0
[INFO/File Saving] end buffer 2
[INFO/File Saving] start buffer 1
[[INFO/MainProcess] Buffer 2 idle
INFO/Display Prep] end buffer 0
[INFO/File Saving] end buffer 1
[INFO/File Saving] start buffer 0
[INFO/MainProcess] Buffer 1 idle
[INFO/File Saving] end buffer 0
[INFO/MainProcess] Buffer 0 idle
[INFO/Camera] process shutting down
[INFO/Camera] process exiting with exitcode 0
[INFO/Display Prep] process shutting down
[INFO/File Saving] process shutting down
[INFO/Display Prep] process exiting with exitcode 0
[INFO/File Saving] process exiting with exitcode 0
Elapsed time: 0.263240348548
[INFO/MainProcess] process shutting down

C:codeinstrument_control>

它似乎在做我想做的事:处理数据以供显示并保存到磁盘,而不会影响采集率.

It seems to do what I want: the data gets processed for display and saved to disk without interfering with the acquisition rate.

这篇关于使用python多处理动态创建共享数组列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)