[GRASS-SVN] r62711 - grass/trunk/lib/python/pygrass/modules/interface

svn_grass at osgeo.org svn_grass at osgeo.org
Tue Nov 11 15:36:14 PST 2014


Author: huhabla
Date: 2014-11-11 15:36:14 -0800 (Tue, 11 Nov 2014)
New Revision: 62711

Modified:
   grass/trunk/lib/python/pygrass/modules/interface/module.py
Log:
pygrass: More tests and fixed function for the ParallelModuleQueue


Modified: grass/trunk/lib/python/pygrass/modules/interface/module.py
===================================================================
--- grass/trunk/lib/python/pygrass/modules/interface/module.py	2014-11-11 20:55:57 UTC (rev 62710)
+++ grass/trunk/lib/python/pygrass/modules/interface/module.py	2014-11-11 23:36:14 UTC (rev 62711)
@@ -67,29 +67,57 @@
     number of parallel processes it will wait for all processes to finish,
     sets the stdout and stderr of the Module object and removes it
     from the queue when its finished.
+    
+    To finish the queue before the maximum number of parallel
+    processes was reached call wait() .
 
     This class will raise a GrassError in case a Module process exits
     with a return code other than 0.
 
     Usage:
 
+    Check with a queue size of 3 and 5 processes
+    
     >>> import copy
     >>> from grass.pygrass.modules import Module, ParallelModuleQueue
     >>> mapcalc_list = []
+    
+    Setting run_ to False is important, otherwise a parallel processing is not possible
+    
     >>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
     >>> queue = ParallelModuleQueue(nprocs=3)
     >>> for i in xrange(5):
     ...     new_mapcalc = copy.deepcopy(mapcalc)
     ...     mapcalc_list.append(new_mapcalc)
-    ...     new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
-    ...     queue.put(new_mapcalc)
-    Module('r.mapcalc')
-    Module('r.mapcalc')
-    Module('r.mapcalc')
-    Module('r.mapcalc')
-    Module('r.mapcalc')
+    ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+    ...     queue.put(m)
+    >>> queue.wait()
+    >>> queue.get_num_run_procs()
+    0
+    >>> queue.get_max_num_procs()
+    3
+    >>> for mapcalc in mapcalc_list:
+    ...     print(mapcalc.popen.returncode)
+    0
+    0
+    0
+    0
+    0
 
+    Check with a queue size of 8 and 5 processes
+    
+    >>> queue = ParallelModuleQueue(nprocs=8)
+    >>> mapcalc_list = []
+    >>> for i in xrange(5):
+    ...     new_mapcalc = copy.deepcopy(mapcalc)
+    ...     mapcalc_list.append(new_mapcalc)
+    ...     m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+    ...     queue.put(m)
     >>> queue.wait()
+    >>> queue.get_num_run_procs()
+    0
+    >>> queue.get_max_num_procs()
+    8
     >>> for mapcalc in mapcalc_list:
     ...     print(mapcalc.popen.returncode)
     0
@@ -98,6 +126,86 @@
     0
     0
 
+    Check with a queue size of 8 and 4 processes
+    
+    >>> queue = ParallelModuleQueue(nprocs=8)
+    >>> mapcalc_list = []
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_1 =1")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    1
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_2 =2")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    2
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_3 =3")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    3
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_4 =4")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    4
+    >>> queue.wait()
+    >>> queue.get_num_run_procs()
+    0
+    >>> queue.get_max_num_procs()
+    8
+    >>> for mapcalc in mapcalc_list:
+    ...     print(mapcalc.popen.returncode)
+    0
+    0
+    0
+    0
+
+    Check with a queue size of 3 and 4 processes
+
+    >>> queue = ParallelModuleQueue(nprocs=3)
+    >>> mapcalc_list = []
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_1 =1")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    1
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_2 =2")
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    2
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_3 =3")
+    >>> queue.put(m) # Now it will wait until all procs finish and set the counter back to 0
+    >>> queue.get_num_run_procs()
+    0
+    >>> new_mapcalc = copy.deepcopy(mapcalc)
+    >>> mapcalc_list.append(new_mapcalc)
+    >>> m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+    >>> queue.put(m)
+    >>> queue.get_num_run_procs()
+    1
+    >>> queue.wait()
+    >>> queue.get_num_run_procs()
+    0
+    >>> queue.get_max_num_procs()
+    3
+    >>> for mapcalc in mapcalc_list:
+    ...     print(mapcalc.popen.returncode)
+    0
+    0
+    0
+    0
+
     """
     def __init__(self, nprocs=1):
         """Constructor
@@ -147,13 +255,14 @@
         """Get the number of Module processes that are in the queue running
         or finished
 
-        :returns: the maximum number fo Module processes running/finished in
-                  the queue
+        :returns: the number fo Module processes running/finished in the queue
         """
-        return len(self._list)
+        return self._proc_count 
 
     def get_max_num_procs(self):
         """Return the maximum number of parallel Module processes
+        
+        :returns: the maximum number of parallel Module processes
         """
         return self._num_procs
 



More information about the grass-commit mailing list