[GRASS-SVN] r69507 - in grass/trunk/lib/python/pygrass/modules: . interface
svn_grass at osgeo.org
svn_grass at osgeo.org
Fri Sep 16 08:39:51 PDT 2016
Author: huhabla
Date: 2016-09-16 08:39:50 -0700 (Fri, 16 Sep 2016)
New Revision: 69507
Modified:
grass/trunk/lib/python/pygrass/modules/__init__.py
grass/trunk/lib/python/pygrass/modules/interface/__init__.py
grass/trunk/lib/python/pygrass/modules/interface/module.py
Log:
pygrass module: Added MultiModule class to put multiple modules into the parallel module queue, for example coupled g.region and r.mapcalc calls, that must be run in a specific order but many of these combined calls should run in parallel
Modified: grass/trunk/lib/python/pygrass/modules/__init__.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/__init__.py 2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/__init__.py 2016-09-16 15:39:50 UTC (rev 69507)
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-
-from grass.pygrass.modules.interface import Module, ParallelModuleQueue
+from grass.pygrass.modules.interface import Module, MultiModule, ParallelModuleQueue
from grass.pygrass.modules import shortcuts
Modified: grass/trunk/lib/python/pygrass/modules/interface/__init__.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/__init__.py 2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/interface/__init__.py 2016-09-16 15:39:50 UTC (rev 69507)
@@ -10,4 +10,4 @@
from grass.pygrass.modules.interface import typedict
from grass.pygrass.modules.interface import read
-from grass.pygrass.modules.interface.module import Module, ParallelModuleQueue
+from grass.pygrass.modules.interface.module import Module, MultiModule, ParallelModuleQueue
Modified: grass/trunk/lib/python/pygrass/modules/interface/module.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/module.py 2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/interface/module.py 2016-09-16 15:39:50 UTC (rev 69507)
@@ -3,6 +3,7 @@
with_statement, print_function, unicode_literals)
import sys
from multiprocessing import cpu_count
+import threading
import time
from xml.etree.ElementTree import fromstring
@@ -94,6 +95,32 @@
0
0
+ Check MultiModule approach with three by two processes
+ >>> gregion = Module("g.region", flags="p", run_=False)
+ >>> queue = ParallelModuleQueue(nprocs=3)
+ >>> proc_list = []
+ >>> for i in xrange(3):
+ ... new_gregion = copy.deepcopy(gregion)
+ ... proc_list.append(new_gregion)
+ ... new_mapcalc = copy.deepcopy(mapcalc)
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+ ... proc_list.append(new_mapcalc)
+ ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
+ ... queue.put(mm)
+ >>> queue.wait()
+ >>> queue.get_num_run_procs()
+ 0
+ >>> queue.get_max_num_procs()
+ 3
+ >>> for proc in proc_list:
+ ... print(proc.popen.returncode)
+ 0
+ 0
+ 0
+ 0
+ 0
+ 0
+
Check with a queue size of 8 and 4 processes
>>> queue = ParallelModuleQueue(nprocs=8)
@@ -194,9 +221,9 @@
To run the Module objects in parallel the run\_ and finish\_ options
of the Module must be set to False.
- :param module: a preconfigured Module object with run\_ and finish\_
- set to False
- :type module: Module object
+ :param module: a preconfigured Module object or a list of Module objects
+ with run\_ and finish\_ set to False,
+ :type module: Module object or list of Module objects
"""
self._list[self._proc_count] = module
# Force that finish is False, otherwise the execution
@@ -209,11 +236,11 @@
self.wait()
def get(self, num):
- """Get a Module object from the queue
+ """Get a Module object or list of Module objects from the queue
:param num: the number of the object in queue
:type num: int
- :returns: the Module object or None if num is not in the queue
+ :returns: the Module object or list of Module objects or None if num is not in the queue
"""
if num < self._num_procs:
return self._list[num]
@@ -251,13 +278,8 @@
"""
for proc in self._list:
if proc:
- stdout, stderr = proc.popen.communicate(input=proc.stdin)
- proc.outputs['stdout'].value = stdout if stdout else ''
- proc.outputs['stderr'].value = stderr if stderr else ''
+ proc.wait()
- if proc.popen.returncode != 0:
- GrassError(("Error running module %s") % (proc.name))
-
self._list = self._num_procs * [None]
self._proc_count = 0
@@ -322,11 +344,25 @@
>>> mapcalc.popen.returncode
0
+ >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
+ ... overwrite=True, run_=False, finish_=False)
+ >>> mapcalc.run()
+ Module('r.mapcalc')
+ >>> mapcalc.wait()
+ >>> mapcalc.popen.returncode
+ 0
+ >>> mapcalc.run()
+ Module('r.mapcalc')
+ >>> mapcalc.wait()
+ >>> mapcalc.popen.returncode
+ 0
+
>>> colors = Module("r.colors", map="test_a", rules="-",
... run_=False, stdout_=PIPE,
... stderr_=PIPE, stdin_="1 red")
>>> colors.run()
Module('r.colors')
+ >>> mapcalc.wait()
>>> colors.popen.returncode
0
>>> colors.inputs["stdin"].value
@@ -522,6 +558,8 @@
self.outputs['stderr'] = Parameter(diz=diz)
self.popen = None
self.time = None
+ self.start_time = None # This variable will be set in the run() function
+ self._finished = False # This variable is set True if wait() was successfully called
if args or kargs:
self.__call__(*args, **kargs)
@@ -680,40 +718,162 @@
def run(self):
"""Run the module
-
- :param node:
- :type node:
-
This function will wait for the process to terminate in case
finish_==True and sets up stdout and stderr. If finish_==False this
- function will return after starting the process. Use
- self.popen.communicate() of self.popen.wait() to wait for the process
- termination. The handling of stdout and stderr must then be done
- outside of this function.
+ function will return after starting the process. Use wait() to wait for
+ the started process
"""
G_debug(1, self.get_bash())
+ self._finished = False
if self.inputs['stdin'].value:
self.stdin = self.inputs['stdin'].value
self.stdin_ = PIPE
cmd = self.make_cmd()
- start = time.time()
+ self.start_time = time.time()
self.popen = Popen(cmd,
stdin=self.stdin_,
stdout=self.stdout_,
stderr=self.stderr_,
env=self.env_)
- if self.finish_:
+
+ if self.finish_ is True:
+ self.wait()
+
+ return self
+
+ def wait(self):
+ """Wait for the module to finish. Call this method if
+ the run() call was performed with self.false_ = False.
+ """
+ if self._finished is False:
stdout, stderr = self.popen.communicate(input=self.stdin)
self.outputs['stdout'].value = stdout if stdout else ''
self.outputs['stderr'].value = stderr if stderr else ''
- self.time = time.time() - start
+ self.time = time.time() - self.start_time
+
+ self._finished = True
+
if self.popen.poll():
raise CalledModuleError(returncode=self.popen.returncode,
code=self.get_bash(),
module=self.name, errors=stderr)
- return self
+
+class MultiModule(object):
+ """This class is designed to run modules in serial in the provided order.
+ Module can be run in serial synchronously or asynchronously.
+
+ - Synchronously: When calling run() all modules will run in serial order
+ until they are finished and then return
+ - Asynchronously: When calling run() all modules will run in serial order in a background thread,
+ run() will return after starting the modules without waiting for them to finish.
+ The user must call the wait() method to wait for the modules to finish.
+
+ >>> from grass.pygrass.modules import Module
+ >>> from grass.pygrass.modules import MultiModule
+ >>> import threading
+ >>> import copy
+
+ Synchronous module run
+ >>> region_1 = Module("g.region", run_=False)
+ >>> region_1.flags.p = True
+ >>> region_2 = copy.deepcopy(region_1)
+ >>> region_2.flags.p = True
+ >>> mm = MultiModule(module_list=[region_1, region_2])
+ >>> mm.run()
+ >>> m_list = mm.get_modules()
+ >>> m_list[0].popen.returncode
+ 0
+ >>> m_list[1].popen.returncode
+ 0
+
+ Asynchronous module run, setting finish = False
+ >>> region_1 = Module("g.region", run_=False)
+ >>> region_1.flags.p = True
+ >>> region_2 = copy.deepcopy(region_1)
+ >>> region_2.flags.p = True
+ >>> region_3 = copy.deepcopy(region_1)
+ >>> region_3.flags.p = True
+ >>> region_4 = copy.deepcopy(region_1)
+ >>> region_4.flags.p = True
+ >>> region_5 = copy.deepcopy(region_1)
+ >>> region_5.flags.p = True
+ >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
+ ... finish=False)
+ >>> t = mm.run()
+ >>> isinstance(t, threading.Thread)
+ True
+ >>> mm.wait()
+ >>> m_list = mm.get_modules()
+ >>> m_list[0].popen.returncode
+ 0
+ >>> m_list[1].popen.returncode
+ 0
+ >>> m_list[2].popen.returncode
+ 0
+ >>> m_list[3].popen.returncode
+ 0
+ >>> m_list[4].popen.returncode
+ 0
+
+ """
+
+ def __init__(self, module_list=[], finish=True):
+ """Konstruktor of the multi module runner
+
+ :param module_list: A list of preconfigured modules that should be run by this module
+ :param finish: If set True the run() method will wait for all processes to finish,
+ If set False, the run() method will return after starting the processes.
+ The wait() method must be called to finish the modules.
+ :return:
+ """
+ self.module_list = module_list
+ self.finish_ = finish
+ self.t = None
+
+ def get_modules(self):
+ """Return the list of modules
+ :return: The list of modules
+ """
+ return self.module_list
+
+ def run(self):
+ """Start the modules in the list. if self.finished_ is set True
+ this method will return after all processes finished.
+
+ If self.finish_ is set False, this method will return
+ after the process list was started for execution.
+ In a background thread, the processes in the list will
+ be run one after the another.
+
+ :return: None in case of self.finish_ is True, Thread that runs the module otherwise
+ """
+
+ if self.finish_ is True:
+ for module in self.module_list:
+ module.finish_ = True
+ module.run()
+ return None
+ else:
+ self.t = threading.Thread(target=run_modules_in_serial,
+ args=[self.module_list])
+ self.t.start()
+
+ return self.t
+
+ def wait(self):
+ """Wait for all processes to finish. Call this method if finished was set False
+ """
+ if self.t:
+ self.t.join()
+
+def run_modules_in_serial(module_list):
+ for proc in module_list:
+ proc.run()
+ proc.wait()
+ return
+
###############################################################################
if __name__ == "__main__":
More information about the grass-commit
mailing list