[GRASS-SVN] r69508 - grass/trunk/lib/python/pygrass/modules/interface

svn_grass at osgeo.org svn_grass at osgeo.org
Fri Sep 16 15:20:28 PDT 2016


Author: huhabla
Date: 2016-09-16 15:20:28 -0700 (Fri, 16 Sep 2016)
New Revision: 69508

Modified:
   grass/trunk/lib/python/pygrass/modules/interface/module.py
Log:
pygrass modules: Implemented process based version of MultiModule class and enabled temporary region environment for multi module runs


Modified: grass/trunk/lib/python/pygrass/modules/interface/module.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/module.py	2016-09-16 15:39:50 UTC (rev 69507)
+++ grass/trunk/lib/python/pygrass/modules/interface/module.py	2016-09-16 22:20:28 UTC (rev 69508)
@@ -2,13 +2,12 @@
 from __future__ import (nested_scopes, generators, division, absolute_import,
                         with_statement, print_function, unicode_literals)
 import sys
-from multiprocessing import cpu_count
-import threading
+from multiprocessing import cpu_count, Process, Queue
 import time
 from xml.etree.ElementTree import fromstring
 
 from grass.exceptions import CalledModuleError, GrassError, ParameterError
-from grass.script.core import Popen, PIPE
+from grass.script.core import Popen, PIPE, use_temp_region, del_temp_region
 from .docstring import docstring_property
 from .parameter import Parameter
 from .flag import Flag
@@ -28,10 +27,11 @@
 
 
 class ParallelModuleQueue(object):
-    """This class is designed to run an arbitrary number of pygrass Module
+    """This class is designed to run an arbitrary number of pygrass Module or MultiModule
     processes in parallel.
 
-    Objects of type grass.pygrass.modules.Module can be put into the
+    Objects of type grass.pygrass.modules.Module or
+    grass.pygrass.modules.MultiModule can be put into the
     queue using put() method. When the queue is full with the maximum
     number of parallel processes it will wait for all processes to finish,
     sets the stdout and stderr of the Module object and removes it
@@ -43,6 +43,10 @@
     This class will raise a GrassError in case a Module process exits
     with a return code other than 0.
 
+    Processes that were run asynchronously with the MultiModule class
+    will not raise a GrassError in case of failure. This must be manually checked
+    by accessing finished modules by calling get_finished_modules().
+
     Usage:
 
     Check with a queue size of 3 and 5 processes
@@ -61,6 +65,7 @@
     ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
     ...     queue.put(m)
     >>> queue.wait()
+    >>> mapcalc_list = queue.get_finished_modules()
     >>> queue.get_num_run_procs()
     0
     >>> queue.get_max_num_procs()
@@ -83,6 +88,7 @@
     ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
     ...     queue.put(m)
     >>> queue.wait()
+    >>> mapcalc_list = queue.get_finished_modules()
     >>> queue.get_num_run_procs()
     0
     >>> queue.get_max_num_procs()
@@ -95,7 +101,7 @@
     0
     0
 
-    Check MultiModule approach with three by two processes
+    Check MultiModule approach with three by two processes running in a background process
     >>> gregion = Module("g.region", flags="p", run_=False)
     >>> queue = ParallelModuleQueue(nprocs=3)
     >>> proc_list = []
@@ -105,9 +111,10 @@
     ...     new_mapcalc = copy.deepcopy(mapcalc)
     ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
     ...     proc_list.append(new_mapcalc)
-    ...     mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
+    ...     mm = MultiModule(module_list=[new_gregion, new_mapcalc], sync=False, set_temp_region=True)
     ...     queue.put(mm)
     >>> queue.wait()
+    >>> proc_list = queue.get_finished_modules()
     >>> queue.get_num_run_procs()
     0
     >>> queue.get_max_num_procs()
@@ -122,7 +129,6 @@
     0
 
     Check with a queue size of 8 and 4 processes
-
     >>> queue = ParallelModuleQueue(nprocs=8)
     >>> mapcalc_list = []
     >>> new_mapcalc = copy.deepcopy(mapcalc)
@@ -150,6 +156,7 @@
     >>> queue.get_num_run_procs()
     4
     >>> queue.wait()
+    >>> mapcalc_list = queue.get_finished_modules()
     >>> queue.get_num_run_procs()
     0
     >>> queue.get_max_num_procs()
@@ -190,6 +197,7 @@
     >>> queue.get_num_run_procs()
     1
     >>> queue.wait()
+    >>> mapcalc_list = queue.get_finished_modules()
     >>> queue.get_num_run_procs()
     0
     >>> queue.get_max_num_procs()
@@ -206,7 +214,7 @@
         """Constructor
 
         :param nprocs: The maximum number of Module processes that
-                       can be run in parallel, defualt is 1, if None
+                       can be run in parallel, default is 1, if None
                        then use all the available CPUs.
         :type nprocs: int
         """
@@ -214,16 +222,17 @@
         self._num_procs = nprocs
         self._list = nprocs * [None]
         self._proc_count = 0
+        self._finished_modules = []  # Store all processed modules in a list
 
     def put(self, module):
-        """Put the next Module object in the queue
+        """Put the next Module or MultiModule object in the queue
 
         To run the Module objects in parallel the run\_ and finish\_ options
         of the Module must be set to False.
 
-        :param module: a preconfigured Module object or a list of Module objects
+        :param module: a preconfigured Module or MultiModule object that were configured
                        with run\_ and finish\_ set to False,
-        :type module: Module object or list of Module objects
+        :type module: Module or MultiModule object
         """
         self._list[self._proc_count] = module
         # Force that finish is False, otherwise the execution
@@ -272,18 +281,31 @@
         self._num_procs = int(nprocs)
         self.wait()
 
+    def get_finished_modules(self):
+        """Return all finished processes that were run by this queue
+
+        :return: A list of Module objects
+        """
+        return self._finished_modules
+
     def wait(self):
         """Wait for all Module processes that are in the list to finish
         and set the modules stdout and stderr output options
+
+        :return: A list of modules that were run
         """
         for proc in self._list:
             if proc:
-                proc.wait()
+                if isinstance(proc, Module):
+                    self._finished_modules.extend([proc.wait(),])
+                else:
+                    self._finished_modules.extend(proc.wait())
 
         self._list = self._num_procs * [None]
         self._proc_count = 0
 
 
+
 class Module(object):
     """This class is design to wrap/run/interact with the GRASS modules.
 
@@ -348,13 +370,13 @@
     ...                  overwrite=True, run_=False, finish_=False)
     >>> mapcalc.run()
     Module('r.mapcalc')
-    >>> mapcalc.wait()
-    >>> mapcalc.popen.returncode
+    >>> p = mapcalc.wait()
+    >>> p.popen.returncode
     0
     >>> mapcalc.run()
     Module('r.mapcalc')
-    >>> mapcalc.wait()
-    >>> mapcalc.popen.returncode
+    >>> p = mapcalc.wait()
+    >>> p.popen.returncode
     0
 
     >>> colors = Module("r.colors", map="test_a", rules="-",
@@ -362,8 +384,8 @@
     ...                 stderr_=PIPE, stdin_="1 red")
     >>> colors.run()
     Module('r.colors')
-    >>> mapcalc.wait()
-    >>> colors.popen.returncode
+    >>> p = mapcalc.wait()
+    >>> p.popen.returncode
     0
     >>> colors.inputs["stdin"].value
     u'1 red'
@@ -722,6 +744,8 @@
         finish_==True and sets up stdout and stderr. If finish_==False this
         function will return after starting the process. Use wait() to wait for
         the started process
+
+        :return: A reference to this object
         """
         G_debug(1, self.get_bash())
         self._finished = False
@@ -745,6 +769,8 @@
     def wait(self):
         """Wait for the module to finish. Call this method if
         the run() call was performed with self.false_ = False.
+
+        :return: A reference to this object
         """
         if self._finished is False:
             stdout, stderr = self.popen.communicate(input=self.stdin)
@@ -759,20 +785,33 @@
                                         code=self.get_bash(),
                                         module=self.name, errors=stderr)
 
+        return self
 
+
 class MultiModule(object):
-    """This class is designed to run modules in serial in the provided order.
-    Module can be run in serial  synchronously or asynchronously.
+    """This class is designed to run a list of modules in serial in the provided order.
 
+    Module can be run in serial synchronously or asynchronously.
+
     - Synchronously:  When calling run() all modules will run in serial order
-                      until they are finished and then return
-    - Asynchronously: When calling run() all modules will run in serial order in a background thread,
+                      until they are finished and then return. The modules can be accessed by
+                      calling get_modules().
+    - Asynchronously: When calling run() all modules will run in serial order in a background process,
                       run() will return after starting the modules without waiting for them to finish.
                       The user must call the wait() method to wait for the modules to finish.
+                      Asynchonously called module can be optionally run in a temporary region.
 
+                      Note:
+
+                          Modules run in asynchronous mode can only be accessed via the wait() method.
+                          The wait/( method will return all finished modules as list.
+
+    This class can be passed to the ParallelModuleQueue to run lists of modules in parallel. This
+    is meaningful if region settings must be applied to each parallel module run.
+
     >>> from grass.pygrass.modules import Module
     >>> from grass.pygrass.modules import MultiModule
-    >>> import threading
+    >>> from multiprocessing import Process
     >>> import copy
 
     Synchronous module run
@@ -800,12 +839,11 @@
     >>> region_5 = copy.deepcopy(region_1)
     >>> region_5.flags.p = True
     >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
-    ...                  finish=False)
+    ...                  sync=False)
     >>> t = mm.run()
-    >>> isinstance(t, threading.Thread)
+    >>> isinstance(t, Process)
     True
-    >>> mm.wait()
-    >>> m_list = mm.get_modules()
+    >>> m_list = mm.wait()
     >>> m_list[0].popen.returncode
     0
     >>> m_list[1].popen.returncode
@@ -817,23 +855,55 @@
     >>> m_list[4].popen.returncode
     0
 
+    Asynchronous module run, setting finish = False and using temporary region
+    >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
+    ...                  sync=False, set_temp_region=True)
+    >>> str(mm)
+    'g.region -p ; g.region -p ; g.region -p ; g.region -p ; g.region -p'
+    >>> t = mm.run()
+    >>> isinstance(t, Process)
+    True
+    >>> m_list = mm.wait()
+    >>> m_list[0].popen.returncode
+    0
+    >>> m_list[1].popen.returncode
+    0
+    >>> m_list[2].popen.returncode
+    0
+    >>> m_list[3].popen.returncode
+    0
+    >>> m_list[4].popen.returncode
+    0
+
     """
 
-    def __init__(self, module_list=[], finish=True):
-        """Konstruktor of the multi module runner
+    def __init__(self, module_list, sync=True, set_temp_region=False):
+        """Constructor of the multi module class
 
-        :param module_list: A list of preconfigured modules that should be run by this module
-        :param finish: If set True the run() method will wait for all processes to finish,
-                       If set False, the run() method will return after starting the processes.
-                       The wait() method must be called to finish the modules.
+        :param module_list: A list of pre-configured modules that should be run by this module
+        :param sync: If set True the run() method will wait for all processes to finish -> synchronously run.
+                     If set False, the run() method will return after starting the processes -> asynchronously run.
+                     The wait() method must be called to finish the modules.
+        :param set_temp_region: Set a temporary region in which the modules should be run, hence
+                                region settings in the process list will not affect the current
+                                computation region.
         :return:
         """
         self.module_list = module_list
-        self.finish_ = finish
-        self.t = None
+        self.set_temp_region = set_temp_region
+        self.finish_ = sync      # We use the same variable name a Module
+        self.p = None
+        self.q = Queue()
 
+    def __str__(self):
+        """Return the command string that can be executed in a shell"""
+        return ' ; '.join(str(string) for string in self.module_list)
+
     def get_modules(self):
-        """Return the list of modules
+        """Return the list of modules that have been run in synchronous mode
+
+        Note: Asynchronously run module can only be accessed via the wait() method.
+
         :return: The list of modules
         """
         return self.module_list
@@ -844,10 +914,10 @@
 
         If self.finish_ is set False, this method will return
         after the process list was started for execution.
-        In a background thread, the processes in the list will
+        In a background process, the processes in the list will
         be run one after the another.
 
-        :return: None in case of self.finish_ is True, Thread that runs the module otherwise
+        :return: None in case of self.finish_ is True, Process object that runs the module otherwise
         """
 
         if self.finish_ is True:
@@ -856,24 +926,61 @@
                 module.run()
             return None
         else:
-            self.t = threading.Thread(target=run_modules_in_serial,
-                                      args=[self.module_list])
-            self.t.start()
+            if self.set_temp_region is True:
+                self.p = Process(target=run_modules_in_temp_region,
+                                 args=[self.module_list, self.q])
+            else:
+                self.p = Process(target=run_modules,
+                                 args=[self.module_list, self.q])
+            self.p.start()
 
-            return self.t
+            return self.p
 
     def wait(self):
-        """Wait for all processes to finish. Call this method if finished was set False
+        """Wait for all processes to finish. Call this method if finished was set False.
+
+        :return: The process list with finished processes to check their return states
         """
-        if self.t:
-            self.t.join()
-            
-def run_modules_in_serial(module_list):
-    for proc in module_list:
-        proc.run()
-        proc.wait()
-    return
+        if self.p:
+            proc_list = self.q.get()
+            self.p.join()
 
+            return proc_list
+
+
+def run_modules_in_temp_region(module_list, q):
+    """Run the modules in a temporary region
+
+    :param module_list: The list of modules to run in serial
+    :param q: The process queue to put the finished process list
+    """
+    use_temp_region()
+    try:
+        for proc in module_list:
+            proc.run()
+            proc.wait()
+    except:
+        raise
+    finally:
+        q.put(module_list)
+        del_temp_region()
+
+
+def run_modules(module_list, q):
+    """Run the modules
+
+    :param module_list: The list of modules to run in serial
+    :param q: The process queue to put the finished process list
+    """
+    try:
+        for proc in module_list:
+            proc.run()
+            proc.wait()
+    except:
+        raise
+    finally:
+        q.put(module_list)
+
 ###############################################################################
 
 if __name__ == "__main__":



More information about the grass-commit mailing list