python多处理:写入相同的excel文件

python multiprocessing: write to same excel file(python多处理:写入相同的excel文件)
本文介绍了python多处理:写入相同的excel文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我是 Python 新手,我正在尝试将五个不同进程的结果保存到一个 excel 文件中(每个进程写入不同的工作表).我在这里阅读了不同的帖子,但仍然无法完成,因为我对 pool.map、队列和锁非常困惑,而且我不确定这里需要什么来完成这项任务.到目前为止,这是我的代码:

I am new to Python and I am trying to save the results of five different processes to one excel file (each process write to a different sheet). I have read different posts here, but still can't get it done as I'm very confused about pool.map, queues, and locks, and I'm not sure what is required here to fulfill this task. This is my code so far:

list_of_days = ["2017.03.20", "2017.03.21", "2017.03.22", "2017.03.23", "2017.03.24"]
results = pd.DataFrame()

if __name__ == '__main__':
    global list_of_days
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    nr_of_cores = multiprocessing.cpu_count()
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(processes=nr_of_cores, initializer=init, initargs=(l,))
    pool.map(f, range(len(list_of_days)))
    pool.close()
    pool.join()

def init(l):
    global lock
    lock = l

def f(k):
    global results

    *** DO SOME STUFF HERE***

    results = results[ *** finished pandas dataframe *** ]

    lock.acquire()
    results.to_excel(writer, sheet_name=list_of_days[k])
    writer.save()
    lock.release()

结果是在 excel 中只创建了一张工作表(我假设它是最后完成的过程).关于这段代码的一些问题:

The result is that only one sheet gets created in excel (I assume it is the process finishing last). Some questions about this code:

  • 如何避免定义全局变量?
  • 甚至可以传递数据帧吗?
  • 我应该将锁定移至 main 吗?

非常感谢这里的一些输入,因为我认为掌握多处理是有用的.谢谢

Really appreciate some input here, as I consider mastering multiprocessing as instrumental. Thanks

推荐答案

1) 为什么你在第二个方法的几个地方实现了 time.sleep?

1) Why did you implement time.sleep in several places in your 2nd method?

__main__中,time.sleep(0.1),给已启动的process一个时间片来启动.
f2(fq, q) 中,给 queue 一个时间片来刷新所有缓冲的数据到管道和使用 q.get_nowait().
w(q) 中,仅用于测试模拟 writer.to_excel(...) 的长期运行,我删除了这个.

In __main__, time.sleep(0.1), to give the started process a timeslice to startup.
In f2(fq, q), to give the queue a timeslice to flushed all buffered data to the pipe and as q.get_nowait() are used.
In w(q), are only for testing simulating long run of writer.to_excel(...), i removed this one.

2) pool.map 和 pool = [mp.Process( . )] 有什么区别?

2) What is the difference between pool.map and pool = [mp.Process( . )]?

使用 pool.map 不需要 Queue,不传递参数,代码更短.worker_process 必须立即返回 result 并终止.只要所有 iteration 都完成,pool.map 就会开始一个新进程.results 必须在那之后进行处理.

Using pool.map needs no Queue, no parameter passed, shorter code. The worker_process have to return immediately the result and terminates. pool.map starts a new process as long as all iteration are done. The results have to be processed after that.

使用 pool = [mp.Process( . )],启动 n processes.processqueue.Empty

Using pool = [mp.Process( . )], starts n processes. A process terminates on queue.Empty

您能想出一种情况,您更喜欢一种方法而不是另一种方法吗?

Can you think of a situation where you would prefer one method over the other?

方法一:快速设置,序列化,只对结果感兴趣继续.
方法 2:如果您想并行完成所有工作负载.

Methode 1: Quick setup, serialized, only interested in the result to continue.
Methode 2: If you want to do all workload parallel.

不能在进程中使用 global writer.
writer 实例必须属于一个 process.

You could't use global writer in processes.
The writer instance has to belong to one process.

mp.Pool的用法,例如:

def f1(k):
  # *** DO SOME STUFF HERE***
  results = pd.DataFrame(df_)
  return results

if __name__ == '__main__':
    pool = mp.Pool()
    results = pool.map(f1, range(len(list_of_days)))

    writer = pd.ExcelWriter('../test/myfile.xlsx', engine='xlsxwriter')
    for k, result in enumerate(results):
        result.to_excel(writer, sheet_name=list_of_days[k])

    writer.save()
    pool.close()

这导致 .to_excel(...)__main__ 进程中被依次调用.

This leads to .to_excel(...) are called in sequence in the __main__ process.

如果你想要并行 .to_excel(...) 你必须使用 mp.Queue().
例如:

If you want parallel .to_excel(...) you have to use mp.Queue().
For instance:

worker 进程:

# mp.Queue exeptions have to load from
try:
    # Python3
    import queue
except:
    # Python 2
    import Queue as queue

def f2(fq, q):
    while True:
        try:
            k = fq.get_nowait()
        except queue.Empty:
            exit(0)

        # *** DO SOME STUFF HERE***

        results = pd.DataFrame(df_)
        q.put( (list_of_days[k], results) )
        time.sleep(0.1)  

writer 进程:

def w(q):
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    while True:
        try:
            titel, result = q.get()
        except ValueError:
            writer.save()
            exit(0)

        result.to_excel(writer, sheet_name=titel)

__main__ 进程:

if __name__ == '__main__':
    w_q = mp.Queue()
    w_p = mp.Process(target=w, args=(w_q,))
    w_p.start()
    time.sleep(0.1)

    f_q = mp.Queue()
    for i in range(len(list_of_days)):
        f_q.put(i)

    pool = [mp.Process(target=f2, args=(f_q, w_q,)) for p in range(os.cpu_count())]
    for p in pool:
        p.start()
        time.sleep(0.1)

    for p in pool:
        p.join()

    w_q.put('STOP')
    w_p.join()

用 Python:3.4.2 - pandas:0.19.2 - xlsxwriter:0.9.6 测试

这篇关于python多处理:写入相同的excel文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

build conda package from local python package(从本地 python 包构建 conda 包)
How can I see all packages that depend on a certain package with PIP?(如何使用 PIP 查看依赖于某个包的所有包?)
How to organize multiple python files into a single module without it behaving like a package?(如何将多个 python 文件组织到一个模块中而不像一个包一样?)
Check if requirements are up to date(检查要求是否是最新的)
How to upload new versions of project to PyPI with twine?(如何使用 twine 将新版本的项目上传到 PyPI?)
Why #egg=foo when pip-installing from git repo(为什么从 git repo 进行 pip 安装时 #egg=foo)