[GRASS-SVN] r69507 - in grass/trunk/lib/python/pygrass/modules: . interface

svn_grass at osgeo.org svn_grass at osgeo.org
Fri Sep 16 08:39:51 PDT 2016


Author: huhabla
Date: 2016-09-16 08:39:50 -0700 (Fri, 16 Sep 2016)
New Revision: 69507

Modified:
   grass/trunk/lib/python/pygrass/modules/__init__.py
   grass/trunk/lib/python/pygrass/modules/interface/__init__.py
   grass/trunk/lib/python/pygrass/modules/interface/module.py
Log:
pygrass module: Added MultiModule class to put multiple modules into the parallel module queue, for example coupled g.region and r.mapcalc calls, that must be run in a specific order but many of these combined calls should run in parallel


Modified: grass/trunk/lib/python/pygrass/modules/__init__.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/__init__.py	2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/__init__.py	2016-09-16 15:39:50 UTC (rev 69507)
@@ -1,3 +1,3 @@
 # -*- coding: utf-8 -*-
-from grass.pygrass.modules.interface import Module, ParallelModuleQueue
+from grass.pygrass.modules.interface import Module, MultiModule, ParallelModuleQueue
 from grass.pygrass.modules import shortcuts

Modified: grass/trunk/lib/python/pygrass/modules/interface/__init__.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/__init__.py	2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/interface/__init__.py	2016-09-16 15:39:50 UTC (rev 69507)
@@ -10,4 +10,4 @@
 from grass.pygrass.modules.interface import typedict
 from grass.pygrass.modules.interface import read
 
-from grass.pygrass.modules.interface.module import Module, ParallelModuleQueue
+from grass.pygrass.modules.interface.module import Module, MultiModule, ParallelModuleQueue

Modified: grass/trunk/lib/python/pygrass/modules/interface/module.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/module.py	2016-09-16 15:35:48 UTC (rev 69506)
+++ grass/trunk/lib/python/pygrass/modules/interface/module.py	2016-09-16 15:39:50 UTC (rev 69507)
@@ -3,6 +3,7 @@
                         with_statement, print_function, unicode_literals)
 import sys
 from multiprocessing import cpu_count
+import threading
 import time
 from xml.etree.ElementTree import fromstring
 
@@ -94,6 +95,32 @@
     0
     0
 
+    Check MultiModule approach with three by two processes
+    >>> gregion = Module("g.region", flags="p", run_=False)
+    >>> queue = ParallelModuleQueue(nprocs=3)
+    >>> proc_list = []
+    >>> for i in xrange(3):
+    ...     new_gregion = copy.deepcopy(gregion)
+    ...     proc_list.append(new_gregion)
+    ...     new_mapcalc = copy.deepcopy(mapcalc)
+    ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+    ...     proc_list.append(new_mapcalc)
+    ...     mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
+    ...     queue.put(mm)
+    >>> queue.wait()
+    >>> queue.get_num_run_procs()
+    0
+    >>> queue.get_max_num_procs()
+    3
+    >>> for proc in proc_list:
+    ...     print(proc.popen.returncode)
+    0
+    0
+    0
+    0
+    0
+    0
+
     Check with a queue size of 8 and 4 processes
 
     >>> queue = ParallelModuleQueue(nprocs=8)
@@ -194,9 +221,9 @@
         To run the Module objects in parallel the run\_ and finish\_ options
         of the Module must be set to False.
 
-        :param module: a preconfigured Module object with run\_ and finish\_
-                       set to False
-        :type module: Module object
+        :param module: a preconfigured Module object or a list of Module objects
+                       with run\_ and finish\_ set to False,
+        :type module: Module object or list of Module objects
         """
         self._list[self._proc_count] = module
         # Force that finish is False, otherwise the execution
@@ -209,11 +236,11 @@
             self.wait()
 
     def get(self, num):
-        """Get a Module object from the queue
+        """Get a Module object or list of Module objects from the queue
 
         :param num: the number of the object in queue
         :type num: int
-        :returns: the Module object or None if num is not in the queue
+        :returns: the Module object or list of Module objects or None if num is not in the queue
         """
         if num < self._num_procs:
             return self._list[num]
@@ -251,13 +278,8 @@
         """
         for proc in self._list:
             if proc:
-                stdout, stderr = proc.popen.communicate(input=proc.stdin)
-                proc.outputs['stdout'].value = stdout if stdout else ''
-                proc.outputs['stderr'].value = stderr if stderr else ''
+                proc.wait()
 
-                if proc.popen.returncode != 0:
-                    GrassError(("Error running module %s") % (proc.name))
-
         self._list = self._num_procs * [None]
         self._proc_count = 0
 
@@ -322,11 +344,25 @@
     >>> mapcalc.popen.returncode
     0
 
+    >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
+    ...                  overwrite=True, run_=False, finish_=False)
+    >>> mapcalc.run()
+    Module('r.mapcalc')
+    >>> mapcalc.wait()
+    >>> mapcalc.popen.returncode
+    0
+    >>> mapcalc.run()
+    Module('r.mapcalc')
+    >>> mapcalc.wait()
+    >>> mapcalc.popen.returncode
+    0
+
     >>> colors = Module("r.colors", map="test_a", rules="-",
     ...                 run_=False, stdout_=PIPE,
     ...                 stderr_=PIPE, stdin_="1 red")
     >>> colors.run()
     Module('r.colors')
+    >>> mapcalc.wait()
     >>> colors.popen.returncode
     0
     >>> colors.inputs["stdin"].value
@@ -522,6 +558,8 @@
         self.outputs['stderr'] = Parameter(diz=diz)
         self.popen = None
         self.time = None
+        self.start_time = None            # This variable will be set in the run() function
+        self._finished = False            # This variable is set True if wait() was successfully called
 
         if args or kargs:
             self.__call__(*args, **kargs)
@@ -680,40 +718,162 @@
 
     def run(self):
         """Run the module
-
-        :param node:
-        :type node:
-
         This function will wait for the process to terminate in case
         finish_==True and sets up stdout and stderr. If finish_==False this
-        function will return after starting the process. Use
-        self.popen.communicate() of self.popen.wait() to wait for the process
-        termination. The handling of stdout and stderr must then be done
-        outside of this function.
+        function will return after starting the process. Use wait() to wait for
+        the started process
         """
         G_debug(1, self.get_bash())
+        self._finished = False
         if self.inputs['stdin'].value:
             self.stdin = self.inputs['stdin'].value
             self.stdin_ = PIPE
 
         cmd = self.make_cmd()
-        start = time.time()
+        self.start_time = time.time()
         self.popen = Popen(cmd,
                            stdin=self.stdin_,
                            stdout=self.stdout_,
                            stderr=self.stderr_,
                            env=self.env_)
-        if self.finish_:
+
+        if self.finish_ is True:
+            self.wait()
+
+        return self
+
+    def wait(self):
+        """Wait for the module to finish. Call this method if
+        the run() call was performed with self.false_ = False.
+        """
+        if self._finished is False:
             stdout, stderr = self.popen.communicate(input=self.stdin)
             self.outputs['stdout'].value = stdout if stdout else ''
             self.outputs['stderr'].value = stderr if stderr else ''
-            self.time = time.time() - start
+            self.time = time.time() - self.start_time
+
+            self._finished = True
+
             if self.popen.poll():
                 raise CalledModuleError(returncode=self.popen.returncode,
                                         code=self.get_bash(),
                                         module=self.name, errors=stderr)
-        return self
 
+
+class MultiModule(object):
+    """This class is designed to run modules in serial in the provided order.
+    Module can be run in serial  synchronously or asynchronously.
+
+    - Synchronously:  When calling run() all modules will run in serial order
+                      until they are finished and then return
+    - Asynchronously: When calling run() all modules will run in serial order in a background thread,
+                      run() will return after starting the modules without waiting for them to finish.
+                      The user must call the wait() method to wait for the modules to finish.
+
+    >>> from grass.pygrass.modules import Module
+    >>> from grass.pygrass.modules import MultiModule
+    >>> import threading
+    >>> import copy
+
+    Synchronous module run
+    >>> region_1 = Module("g.region", run_=False)
+    >>> region_1.flags.p = True
+    >>> region_2 = copy.deepcopy(region_1)
+    >>> region_2.flags.p = True
+    >>> mm = MultiModule(module_list=[region_1, region_2])
+    >>> mm.run()
+    >>> m_list = mm.get_modules()
+    >>> m_list[0].popen.returncode
+    0
+    >>> m_list[1].popen.returncode
+    0
+
+    Asynchronous module run, setting finish = False
+    >>> region_1 = Module("g.region", run_=False)
+    >>> region_1.flags.p = True
+    >>> region_2 = copy.deepcopy(region_1)
+    >>> region_2.flags.p = True
+    >>> region_3 = copy.deepcopy(region_1)
+    >>> region_3.flags.p = True
+    >>> region_4 = copy.deepcopy(region_1)
+    >>> region_4.flags.p = True
+    >>> region_5 = copy.deepcopy(region_1)
+    >>> region_5.flags.p = True
+    >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
+    ...                  finish=False)
+    >>> t = mm.run()
+    >>> isinstance(t, threading.Thread)
+    True
+    >>> mm.wait()
+    >>> m_list = mm.get_modules()
+    >>> m_list[0].popen.returncode
+    0
+    >>> m_list[1].popen.returncode
+    0
+    >>> m_list[2].popen.returncode
+    0
+    >>> m_list[3].popen.returncode
+    0
+    >>> m_list[4].popen.returncode
+    0
+
+    """
+
+    def __init__(self, module_list=[], finish=True):
+        """Konstruktor of the multi module runner
+
+        :param module_list: A list of preconfigured modules that should be run by this module
+        :param finish: If set True the run() method will wait for all processes to finish,
+                       If set False, the run() method will return after starting the processes.
+                       The wait() method must be called to finish the modules.
+        :return:
+        """
+        self.module_list = module_list
+        self.finish_ = finish
+        self.t = None
+
+    def get_modules(self):
+        """Return the list of modules
+        :return: The list of modules
+        """
+        return self.module_list
+
+    def run(self):
+        """Start the modules in the list. if self.finished_ is set True
+        this method will return after all processes finished.
+
+        If self.finish_ is set False, this method will return
+        after the process list was started for execution.
+        In a background thread, the processes in the list will
+        be run one after the another.
+
+        :return: None in case of self.finish_ is True, Thread that runs the module otherwise
+        """
+
+        if self.finish_ is True:
+            for module in self.module_list:
+                module.finish_ = True
+                module.run()
+            return None
+        else:
+            self.t = threading.Thread(target=run_modules_in_serial,
+                                      args=[self.module_list])
+            self.t.start()
+
+            return self.t
+
+    def wait(self):
+        """Wait for all processes to finish. Call this method if finished was set False
+        """
+        if self.t:
+            self.t.join()
+            
+def run_modules_in_serial(module_list):
+    for proc in module_list:
+        proc.run()
+        proc.wait()
+    return
+
 ###############################################################################
 
 if __name__ == "__main__":



More information about the grass-commit mailing list