使用多处理嵌套并行进程 -- python 领域 和 multithreading 领域 和 parallel-processing 领域 和 multiprocessing 领域 和 python-multiprocessing 领域 相关 的问题

Nesting parallel processes using multiprocessing


简体版||繁體版
0
vote

问题

中文

是否有一种方法在已经并行化的函数中并行运行函数?我知道使用multiprocessing.pool()这是不可能作为守护进程无法创建子进程。我相当新的并行计算,并努力找到解决方法。

我目前有几千个计算,需要使用其他一些商业上可用的量子机械码I接口并行运行。每个计算,有三个后续计算,需要在父计算的正常终止时并行执行,如果父计算不正常终止,则这是该点计算的结束。我总是可以将这三个后续计算结合成一个大计算并正常运行 - 尽管我更喜欢并行单独运行。

main当前看起来如此, run() 是父计算是首次运行的一系列点,而 par_nacmes() 是我想要运行的函数与父母正常终止后的三个儿童计算并行。

    def par_nacmes(nacme_input_data):                                                                                                                                                                                                                                                               nacme_dir, nacme_input, index = nacme_input_data  # Unpack info in tuple for the calculation                                                                                                                                                                                                                                           axes_index = get_axis_index(nacme_input)                                                                                                                                                                                                                                                    [norm_term, nacme_outf] = util.run_calculation(molpro_keys, pwd, nacme_dir, nacme_input, index)  # Submit child calculation                                                                                                                                                                                             if norm_term:                                                                                                                                                                                                                                                                                   data.extract_nacme(nacme_outf, molpro_keys['nacme_regex'], index, axes_index)                                                                                                                                                                                                           else:                                                                                                                                                                                                                                                                                           with open('output.log', 'w+') as f:                                                                                                                                                                                                                                                             f.write('NACME Crashed for GP%s - axis %s' % (index, axes_indexdef run(grid_point):                                                                                                                                                                                                                                                                            index, geom = grid_point                                                                                                                                                                                                                                                                    if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                                  [spe_dir, spe_input] = molpro.setup_spe(inputs, geom, pwd, index)                                                                                                                                                                                                                           [norm_term, spe_outf] = util.run_calculation(molpro_keys, pwd, spe_dir, spe_input, index)  # Run each parent calculation                                                                                                                                                                                                   if norm_term:  # If parent calculation terminates normally - Extract data and continue with subsequent calculations for each point                                                                                                                                                                                                                                                                                  data.extract_energies(spe_dir+spe_outf, inputs['spe'], molpro_keys['energy_regex'],                                                                                                                                                                                                                               molpro_keys['cas_prog'], index)                                                                                                                                                                                                                                       if inputs['nacme'] == 'yes':                                                                                                                                                                                                                                                                    [nacme_dir, nacmes_inputs] = molpro.setup_nacme(inputs, geom, spe_dir, index)                                                                                                                                                                                                                                                                                                                                                                                                                         nacmes_data = [(nacme_dir, nacme_inp, index) for nacme_inp in nacmes_inputs] # List of three tuples - each with three elements. Each tuple describes a child calculation to be run in parallel                                                                                                                                                                                                                nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                         nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data]) # Run each calculation in list of tuples in parallel                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           if inputs['grad'] == 'yes':                                                                                                                                                                                                                                                                     passelse:                                                                                                                                                                                                                                                                                           with open('output.log', 'w+') as f:                                                                                                                                                                                                                                                             f.write('SPE crashed for GP%s' % index)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          elif inputs['code'] == 'molcas':   # TO DO                                                                                                                                                                                                                                                      passif __name__ == "__main__":                                                                                                                                                                                                                                                                      try:                                                                                                                                                                                                                                                                                            pwd = os.getcwd()  # parent dir                                                                                                                                                                                                                                                             f = open(inp_geom, 'r')                                                                                                                                                                                                                                                                     ref_geom = np.genfromtxt(f, skip_header=2, usecols=(1, 2, 3), encoding=None)                                                                                                                                                                                                                f.close()                                                                                                                                                                                                                                                                                   geom_list = coordinate_generator(ref_geom)  # Generate nuclear coordinates                                                                                                                                                                                                                                                 if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                                  couplings = molpro.coupled_states(inputs['states'][-1])                                                                                                                                                                                                                                 elif inputs['code'] == 'molcas':                                                                                                                                                                                                                                                                pass                                                                                                                                                                                                                                                                                    data = setup.global_data(ref_geom, inputs['states'][-1], couplings, len(geom_list))                                                                                                                                                                                                         run_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                           run_pool.map(run, [(k, v) for k, v in enumerate(geom_list)])  # Run each parent calculation for each set of coordinatesexcept StopIteration:                                                                                                                                                                                                                                                                           print('Please ensure goemetry file is correct.')       

对如何运行这些子计算的任何洞察,每个点都是很大的帮助。我看到一些人建议使用多线程而不是将守护程序设置为false,虽然我不确定这是不是这样做的最佳方法。

英文原文

Is there a way to run a function in parallel within an already parallelised function? I know that using multiprocessing.Pool() this is not possible as a daemonic process can not create a child process. I am fairly new to parallel computing and am struggling to find a workaround.

I currently have several thousand calculations that need to be run in parallel using some other commercially available quantum mechanical code I interface to. Each calculation, has three subsequent calculations that need to be executed in parallel on normal termination of the parent calculation, if the parent calculation does not terminate normally, that is the end of the calculation for that point. I could always combine these three subsequent calculations into one big calculation and run normally - although I would much prefer to run separately in parallel.

Main currently looks like this, run() is the parent calculation that is first run in parallel for a series of points, and par_nacmes() is the function that I want to run in parallel for three child calculations following normal termination of the parent.

  def par_nacmes(nacme_input_data):                                                                                                                                                                                                                                                               nacme_dir, nacme_input, index = nacme_input_data  # Unpack info in tuple for the calculation                                                                                                                                                                                                                                           axes_index = get_axis_index(nacme_input)                                                                                                                                                                                                                                                    [norm_term, nacme_outf] = util.run_calculation(molpro_keys, pwd, nacme_dir, nacme_input, index)  # Submit child calculation                                                                                                                                                                                             if norm_term:                                                                                                                                                                                                                                                                                   data.extract_nacme(nacme_outf, molpro_keys['nacme_regex'], index, axes_index)                                                                                                                                                                                                           else:                                                                                                                                                                                                                                                                                           with open('output.log', 'w+') as f:                                                                                                                                                                                                                                                             f.write('NACME Crashed for GP%s - axis %s' % (index, axes_index))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       def run(grid_point):                                                                                                                                                                                                                                                                            index, geom = grid_point                                                                                                                                                                                                                                                                    if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                                  [spe_dir, spe_input] = molpro.setup_spe(inputs, geom, pwd, index)                                                                                                                                                                                                                           [norm_term, spe_outf] = util.run_calculation(molpro_keys, pwd, spe_dir, spe_input, index)  # Run each parent calculation                                                                                                                                                                                                   if norm_term:  # If parent calculation terminates normally - Extract data and continue with subsequent calculations for each point                                                                                                                                                                                                                                                                                  data.extract_energies(spe_dir+spe_outf, inputs['spe'], molpro_keys['energy_regex'],                                                                                                                                                                                                                               molpro_keys['cas_prog'], index)                                                                                                                                                                                                                                       if inputs['nacme'] == 'yes':                                                                                                                                                                                                                                                                    [nacme_dir, nacmes_inputs] = molpro.setup_nacme(inputs, geom, spe_dir, index)                                                                                                                                                                                                                                                                                                                                                                                                                         nacmes_data = [(nacme_dir, nacme_inp, index) for nacme_inp in nacmes_inputs] # List of three tuples - each with three elements. Each tuple describes a child calculation to be run in parallel                                                                                                                                                                                                                nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                         nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data]) # Run each calculation in list of tuples in parallel                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           if inputs['grad'] == 'yes':                                                                                                                                                                                                                                                                     pass                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            else:                                                                                                                                                                                                                                                                                           with open('output.log', 'w+') as f:                                                                                                                                                                                                                                                             f.write('SPE crashed for GP%s' % indexelif inputs['code'] == 'molcas':   #xc2xa0TO DO                                                                                                                                                                                                                                                      passif __name__ == "__main__":                                                                                                                                                                                                                                                                      try:                                                                                                                                                                                                                                                                                            pwd = os.getcwd()  # parent dir                                                                                                                                                                                                                                                             f = open(inp_geom, 'r')                                                                                                                                                                                                                                                                     ref_geom = np.genfromtxt(f, skip_header=2, usecols=(1, 2, 3), encoding=None)                                                                                                                                                                                                                f.close()                                                                                                                                                                                                                                                                                   geom_list = coordinate_generator(ref_geom)  # Generate nuclear coordinates                                                                                                                                                                                                                                                 if inputs['code'] == 'molpro':                                                                                                                                                                                                                                                                  couplings = molpro.coupled_states(inputs['states'][-1])                                                                                                                                                                                                                                 elif inputs['code'] == 'molcas':                                                                                                                                                                                                                                                                pass                                                                                                                                                                                                                                                                                    data = setup.global_data(ref_geom, inputs['states'][-1], couplings, len(geom_list))                                                                                                                                                                                                         run_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                           run_pool.map(run, [(k, v) for k, v in enumerate(geom_list)])  # Run each parent calculation for each set of coordinatesexcept StopIteration:                                                                                                                                                                                                                                                                           print('Please ensure goemetry file is correct.')     

Any insight on how to run these child calculations in parallel for each point would be a great help. I have seen some people suggest using multi-threading instead or to set daemon to false, although I am unsure if this is the best way to do this.

              

回答列表

0
 
vote

首先,我不知道为什么你必须在ParaLel中运行Par_Nacmes,但如果你有的话:
使用线程来运行它们而不是进程 或者b使用multiprocessing.process来运行运行,但这会涉及大量的开销,所以我个人不会做到。

就是你所要做的就是 替换

  nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                         nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data])   

在run() 与

  threads = [] for nacme_input in nacmes_data:      t = Thread(target=par_nacmes, args=(nacme_input,)); t.start()      threads.append(t) for t in threads: t.join()   

或者如果您不在乎踏板已完成或不关心

  for nacme_input in nacmes_data:      t = Thread(target=par_nacmes, args=(nacme_input,)); t.start()   
 

firstly I dont know why you have to run par_nacmes in paralel but if you have to you could:
a use threads to run them instead of processes or b use multiprocessing.Process to run run however that would involve a lot of overhead so I personally wouldn't do it.

for a all you have to do is replace

nacme_pool = multiprocessing.Pool()                                                                                                                                                                                                                                                         nacme_pool.map(par_nacmes, [nacme_input for nacme_input in nacmes_data]) 

in run() with

threads = [] for nacme_input in nacmes_data:      t = Thread(target=par_nacmes, args=(nacme_input,)); t.start()      threads.append(t) for t in threads: t.join() 

or if you dont care if the treads have finished or not

for nacme_input in nacmes_data:      t = Thread(target=par_nacmes, args=(nacme_input,)); t.start() 
 
 

相关问题

0  Python多处理使TKinter GUI再次出现  ( Python multiprocessing make tkinter gui appear again ) 
我正在学习创建一个GUI来开始多处理打印。 当我单击"开始" 按钮时,它会再次绘制GUI。无论如何都要限制它仅限运行多处理打印? 创建了gui.py: def stopstart(): global instance if startbtn['text']=="start": ...

0  如何在Python中的进程的线程中访问管道连接对象?多处理  ( How to access pipe connection object in a thread of a process in python multipr ) 
以下是我的代码 from multiprocessing import Process,Queue,Pipe from threading import Thread import time p,q = Pipe() def bcd(p): print p.recv() def abc(p): ...

0  Python多处理,变量的前初始化  ( Python multiprocessing preinitialization of variables ) 
我正在尝试使用多处理模块并行化我的代码。我正在处理的代码有两步。在第一步中,我初始化一个类,它计算并保存在第二步中使用的几个变量。在第二步中,程序使用先前初始化的变量执行计算。第一步的变量不会以任何方式修改。第一步的计算时间并不重要,而是在第二步中,因为它以必要顺序称为几百次。以下是代码结构和IST输出的构造最小示例...

0  是否可以使用Namespaceproxy和BaseManager共享一个复杂的对象而无需锁定?  ( Is it possible to share a complex object with namespaceproxy and basemanager wit ) 
编辑: 我已经设法"删除" 其中一个锁,但它仍然很慢。有人知道锁定的位置吗? class NoLock: def __init__(self): pass def __enter__(self): return self def __exit__(self,...

3  如何修复nameError:Python中未定义全局名称错误?  ( How can fix nameerror global name is not defined error in python ) 
我写了这个python代码,用于使用线程乘以2个矩阵。但是我给出了此错误: AJAX5 我知道它是因为定义了全球A,B矩阵,但我在Python中新是新的,我不知道如何修复它。 我如何解决这个问题? AJAX6 ...

-2  如何获取多处理的MaxSize.queue  ( How to get the maxsize of multiprocessing queue ) 
与队列相反.queue似乎没有maxsize属性。当我尝试通过调用 queue_object.maxsize 我收到以下例外: AttributeError: 'Queue' object has no attribute 'maxsize' 。 是我身边的实现错误,还是它实际上不存在?如果它不存在另一种方法来获取此...

0  多进程类未在实际过程中存储数据  ( Multi process class is not storing data in the actual process ) 
我已经做出了我正在写的更大代码的例子。我想要多个进程来管理100个等级的线程。 我有两个问题,一个是"添加" 方法似乎实际增加了新过程。另一个是,即使是创建的2,3或4个进程,即使是创建的,线程仍然在第一个主要,过程下启动。 以下代码没有显示线程类,但如果您可以帮助解释为什么进程无法正确添加,我可以弄清楚线程部分。 ...

0  访问分配在多处理函数中的变量  ( Accessing a variable assigned in multiprocessed function ) 
我正在制作一个网站,并在启动时,我想启动另一个开始加载嵌入模型的进程,因为这需要很长时间,并且用户最终将需要长时间。这是我的代码: from flask import Flask, render_template from flask_socketio import SocketIO, send import b...

3  如何从Python中的多个进程控制对文件的访问  ( How to control access to a file from multiple processes in python ) 
我暂时地找到了以下多处理问题的解决方案。 我在record.py模块中有一个类记录。记录类的责任是处理输入数据并将其保存到JSON文件中。 记录类具有方法put()更新JSON文件。 记录类是在类装饰器中初始化的。装饰器应用于大多数各种子模块的类。 装饰器提取其装饰的每个方法的信息,并将数据发送到Put()录制类方...

2  pyro4 rpc阻止  ( Pyro4 rpc blocking ) 
我目前正在在高性能数字计算系统上进行开发,将其发布到Web服务器。我们在单独的过程上使用烧瓶来服务网页,以保持一切交互式,并使用WebSockets将数据发送到JS绘图仪。计算也使用多处理来拆分以进行性能原因。 我们目前正在使用pyro4从服务器计算计算中更新的参数。但是,如果每秒的更新数在我们的Pyro4设置对象上...

3  使用Apply_Async与回调函数进行流程池  ( Using apply async with callback function for a pool of processes ) 
我试图了解多处理池是如何工作的。在以下编程中,我创建了一个4个进程的池。 和我调用 apply_async ,带回调函数,应该更新名为 result_list 的列表 import Queue from multiprocessing import Process from multiprocessing i...

1  Python多处理管理器和复合模式共享  ( Python multiprocessing manager composite pattern sharing ) 
我正在尝试通过多处理管理器共享复合结构,但是在尝试仅使用其中一个复合类方法时,我感到陷入困境时遇到" runtimeerror:最大递归深度超过" 。 类是来自 code.activestate 并由我在包容之前测试进入经理。 在将类中检索到一个进程并调用它的 addchild()方法时,我保留了 runtimee...

2  使用Windows中的信号中断Python MultiProcessing.Process  ( Interrupt python multiprocessing process using signals in windows ) 
我有一个python脚本,它使用多处理_process类生成一个新进程。这个过程应该永远运行来监控东西。在UNIX上,我现在可以使用OS.KILL()向该特定进程和信号发送信号。在该过程中的信号(...)来实现我的特定中断处理程序。在Windows的事情上不起作用。 我读取了如何使用 popen 。我还可以为进程类指...

0  ProcessPoolExecutor在将脚本运行为EXEC文件时启动N新窗口  ( Processpoolexecutor launch n new windows when running script as exec file ) 
我有一个脚本,在一个线程和计算中启动pyqt gui(多处理计算)。问题是当我将.py文件转换为.exe文件(使用pyinstaller)来自"如果 name ==' main ''和此行为的每个新创建的进程启动命令我只能从.exe文件中重现(从.py文件启动脚本时它的工作正确) 有人遇到过同样的问题吗? 您可以通...

0  多处理两个用于循环的  ( Multiprocessing of two for loops ) 
我正在努力实现Python(2.7)中的算法,以并行化物理问题的计算。有两个变量的参数空间(让我们说a和b)我想运行我的书面程序f(a,b),返回其他两个变量c和d。 到目前为止,我用两个 for - over a和b,计算了c和d的两个阵列,然后将其保存为txt文档。由于参数空间相对较大,并且每次计算点f(a...

1  如何在Web服务器(Python)中实现多处理?  ( How to implement multi processing in a web server python ) 
我一直在遵循一个教程来编写python webserver: ruslanspivak.com/lsbaws-part3/。 Python Web-Server有一个简单的代码,它应该使用多处理来处理请求 import os import socket import time SERVER_ADDRESS =...

3  Flask + SSE:为什么是Time.Sleep()需要?  ( Flask sse why is time sleep required ) 
我正在构建一个将具有后台进程的烧瓶应用程序(使用 998887664 )轮询Web服务进行新数据,排队数据,以使服务器发送事件端点可以流式传输到客户。 示例代码是: #!/usr/bin/env python from __future__ import print_function import itertoo...

1  Python多处理不是将示例代码打印到stdout  ( Python multiprocessing not printing example code to stdout ) 
我正准确地从python 多处理过程文档 。 from multiprocessing import Process def f(name): print('hello', name) if __name__ == '__main__': p = Process(target=f, args=...

1  为什么单个进程可以在Windows子系统上实现100%的多个CPU使用,用于Linux(WSL),但它不能在服务器上ubuntu?  ( Why a single process can achieve multiple cpu usage of 100 on windows subsystem ) 
我想通过Python多处理模块实现并行计算,因此我实现了模拟计算来测试我是否可以使用多个CPU内核。我发现一个非常奇怪的事情是一个过程可以在我的桌面上为Linux(WSL)的Windows子系统实现8个CPU使用率100%,而不是在Lab的服务器上只有一个CPU使用率100%。 像这样: ,这是对比度: ...

0  如何并行使用发电机读取视频?  ( How to read video with generator in parallel ) 
我对一般的多加管真的很多。 我有以下代码,从给定视频生成一系列帧: def read_video(filename: str) -> np.array: w, h = get_resolution(filename) command = [ "ffmpeg", "...

0  为什么池比相同数量的流程慢  ( Why is pool slower than the same number of processes ) 
我最近尝试将一些平行的进程重构为游泳池,惊讶于池几乎花了几乎是纯粹的过程。请假设它们在同一台机器上运行,具有相同数量的核心。我希望有人可以解释为什么我的实现使用池的实现更长,也许提供一些建议: 共享依赖项: https://github.com/taynaud/python-louvain from co...

14  芹菜:守护进程不允许生孩子  ( Celery daemonic processes are not allowed to have children ) 
在Python(2.7)中,我尝试在Celery任务中创建进程(具有多处理)(Celery 3.1.17),但它给出了错误: daemonic processes are not allowed to have children googling它,我发现最新版本的台球版本修复了"错误" ,但我有最新版本...

4  用sqlalchemy处理跨多个模块的scoped_session  ( Handling scoped session across multiple modules with sqlalchemy ) 
我是newbie使用sqlalchemy,我正在研究一个复杂的ETL过程,所以我做了下面的简化代码: module1.py class Foo: def foo_method(self): # doing stuff with database module2.py ...

2  使用线程更改代码的结果吗?  ( Does using thread alter the result of your code ) 
所以我写了一个函数,何时调用何时返回字典。这看起来像 def path(image_file): Some function that returns a dictionary return co_ordinates def Draw(): image_file = "Data/t...

0  多处理酸洗错误:_pickle.picklingError:无法在0x02b2d420>下泡沫<function myprocess>:它与__main __的对象不同。MyProcess  ( Multiprocessing pickling error pickle picklingerror cant pickle function my ) 
我正在阅读和应用Python Book的代码,我不能在简单示例中使用多处理,您可以在下面看到: import multiprocessing def myProcess(): print("Currently Executing Child Process") print("This proc...

相关问题

0  Python多处理使TKinter GUI再次出现 
0  如何在Python中的进程的线程中访问管道连接对象?多处理 
0  Python多处理,变量的前初始化 
0  是否可以使用Namespaceproxy和BaseManager共享一个复杂的对象而无需锁定? 
3  如何修复nameError:Python中未定义全局名称错误? 
-2  如何获取多处理的MaxSize.queue 
0  多进程类未在实际过程中存储数据 
0  访问分配在多处理函数中的变量 
3  如何从Python中的多个进程控制对文件的访问 
2  pyro4 rpc阻止 
3  使用Apply_Async与回调函数进行流程池 
1  Python多处理管理器和复合模式共享 
2  使用Windows中的信号中断Python MultiProcessing.Process 
0  ProcessPoolExecutor在将脚本运行为EXEC文件时启动N新窗口 
0  多处理两个用于循环的 
1  如何在Web服务器(Python)中实现多处理? 
3  Flask + SSE:为什么是Time.Sleep()需要? 
1  Python多处理不是将示例代码打印到stdout 
1  为什么单个进程可以在Windows子系统上实现100%的多个CPU使用,用于Linux(WSL),但它不能在服务器上ubuntu? 
0  如何并行使用发电机读取视频? 
0  为什么池比相同数量的流程慢 
14  芹菜:守护进程不允许生孩子 
4  用sqlalchemy处理跨多个模块的scoped_session 
2  使用线程更改代码的结果吗? 
0  多处理酸洗错误:_pickle.picklingError:无法在0x02b2d420>下泡沫<function myprocess>:它与__main __的对象不同。MyProcess 



© 2021 it.wenda123.org All Rights Reserved. 问答之家 版权所有