使用多处理读取多个文件

read multiple files using multiprocessing(使用多处理读取多个文件)
本文介绍了使用多处理读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我需要阅读一些非常大的文本文件(100+ Mb),用正则表达式处理每一行并将数据存储到一个结构中.我的结构继承自 defaultdict,它有一个读取 self.file_name 文件的 read(self) 方法.

I need to read some very huge text files (100+ Mb), process every lines with regex and store the data into a structure. My structure inherits from defaultdict, it has a read(self) method that read self.file_name file.

看这个非常简单(但不是真实的)示例,我没有使用正则表达式,但我正在拆分行:

Look at this very simple (but not real) example, I'm not using regex, but I'm splitting lines:


import multiprocessing
from collections import defaultdict

def SingleContainer():
    return list()

class Container(defaultdict):
    """
    this class store odd line in self["odd"] and even line in self["even"].
    It is stupid, but it's only an example. In the real case the class
    has additional methods that do computation on readen data.
    """
    def __init__(self,file_name):
        if type(file_name) != str:
            raise AttributeError, "%s is not a string" % file_name
        defaultdict.__init__(self,SingleContainer)
        self.file_name = file_name
        self.readen_lines = 0
    def read(self):
        f = open(self.file_name)
        print "start reading file %s" % self.file_name
        for line in f:
            self.readen_lines += 1
            values = line.split()
            key = {0: "even", 1: "odd"}[self.readen_lines %2]
            self[key].append(values)
        print "readen %d lines from file %s" % (self.readen_lines, self.file_name)

def do(file_name):
    container = Container(file_name)
    container.read()
    return container.items()

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    result = pool.map(do,file_names)
    pool.close()
    pool.join()
    print "Finish"      

最后,我需要将每个结果加入一个容器中.保持行的顺序很重要.返回值时我的方法太慢了.更好的解决方案?我在 Linux 上使用 python 2.6

At the end I need to join every results in a single Container. It is important that the order of the lines is preserved. My approach is too slow when returning values. Better solution? I'm using python 2.6 on Linux

推荐答案

你可能遇到了两个问题.

You're probably hitting two problems.

提到了其中一个:您正在同时读取多个文件.这些读取最终会被交错,导致磁盘抖动.您想一次读取整个文件,然后只对数据进行多线程计算.

One of them was mentioned: you're reading multiple files at once. Those reads will end up being interleaved, causing disk thrashing. You want to read whole files at once, and then only multithread the computation on the data.

其次,您遇到了 Python 的多处理模块的开销.它实际上不是使用线程,而是启动多个进程并通过管道序列化结果.这对于批量数据来说非常慢——事实上,它似乎比您在线程中所做的工作要慢(至少在示例中).这是由 GIL 引起的现实问题.

Second, you're hitting the overhead of Python's multiprocessing module. It's not actually using threads, but instead starting multiple processes and serializing the results through a pipe. That's very slow for bulk data--in fact, it seems to be slower than the work you're doing in the thread (at least in the example). This is the real-world problem caused by the GIL.

如果我修改 do() 以返回 None 而不是 container.items() 以禁用额外的数据复制,则此示例 比单个线程快,只要文件已被缓存:

If I modify do() to return None instead of container.items() to disable the extra data copy, this example is faster than a single thread, as long as the files are already cached:

两个线程:0.36elapsed 168%CPU

Two threads: 0.36elapsed 168%CPU

一个线程(用map替换pool.map):0:00.52elapsed 98%CPU

One thread (replace pool.map with map): 0:00.52elapsed 98%CPU

不幸的是,GIL 问题是根本性的,无法从 Python 内部解决.

Unfortunately, the GIL problem is fundamental and can't be worked around from inside Python.

这篇关于使用多处理读取多个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)