[GRASS-SVN] r62863 - grass/branches/releasebranch_7_0/lib/python/pygrass/modules/interface
svn_grass at osgeo.org
svn_grass at osgeo.org
Sat Nov 22 14:29:06 PST 2014
Author: huhabla
Date: 2014-11-22 14:29:06 -0800 (Sat, 22 Nov 2014)
New Revision: 62863
Modified:
grass/branches/releasebranch_7_0/lib/python/pygrass/modules/interface/module.py
Log:
pygrass library: Backport of r62711 from trunk
Modified: grass/branches/releasebranch_7_0/lib/python/pygrass/modules/interface/module.py
===================================================================
--- grass/branches/releasebranch_7_0/lib/python/pygrass/modules/interface/module.py 2014-11-22 22:27:27 UTC (rev 62862)
+++ grass/branches/releasebranch_7_0/lib/python/pygrass/modules/interface/module.py 2014-11-22 22:29:06 UTC (rev 62863)
@@ -67,29 +67,57 @@
number of parallel processes it will wait for all processes to finish,
sets the stdout and stderr of the Module object and removes it
from the queue when its finished.
+
+ To finish the queue before the maximum number of parallel
+ processes was reached call wait() .
This class will raise a GrassError in case a Module process exits
with a return code other than 0.
Usage:
+ Check with a queue size of 3 and 5 processes
+
>>> import copy
>>> from grass.pygrass.modules import Module, ParallelModuleQueue
>>> mapcalc_list = []
+
+ Setting run_ to False is important, otherwise a parallel processing is not possible
+
>>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
>>> queue = ParallelModuleQueue(nprocs=3)
>>> for i in xrange(5):
... new_mapcalc = copy.deepcopy(mapcalc)
... mapcalc_list.append(new_mapcalc)
- ... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
- ... queue.put(new_mapcalc)
- Module('r.mapcalc')
- Module('r.mapcalc')
- Module('r.mapcalc')
- Module('r.mapcalc')
- Module('r.mapcalc')
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+ ... queue.put(m)
+ >>> queue.wait()
+ >>> queue.get_num_run_procs()
+ 0
+ >>> queue.get_max_num_procs()
+ 3
+ >>> for mapcalc in mapcalc_list:
+ ... print(mapcalc.popen.returncode)
+ 0
+ 0
+ 0
+ 0
+ 0
+ Check with a queue size of 8 and 5 processes
+
+ >>> queue = ParallelModuleQueue(nprocs=8)
+ >>> mapcalc_list = []
+ >>> for i in xrange(5):
+ ... new_mapcalc = copy.deepcopy(mapcalc)
+ ... mapcalc_list.append(new_mapcalc)
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+ ... queue.put(m)
>>> queue.wait()
+ >>> queue.get_num_run_procs()
+ 0
+ >>> queue.get_max_num_procs()
+ 8
>>> for mapcalc in mapcalc_list:
... print(mapcalc.popen.returncode)
0
@@ -98,6 +126,86 @@
0
0
+ Check with a queue size of 8 and 4 processes
+
+ >>> queue = ParallelModuleQueue(nprocs=8)
+ >>> mapcalc_list = []
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_1 =1")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 1
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_2 =2")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 2
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_3 =3")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 3
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_4 =4")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 4
+ >>> queue.wait()
+ >>> queue.get_num_run_procs()
+ 0
+ >>> queue.get_max_num_procs()
+ 8
+ >>> for mapcalc in mapcalc_list:
+ ... print(mapcalc.popen.returncode)
+ 0
+ 0
+ 0
+ 0
+
+ Check with a queue size of 3 and 4 processes
+
+ >>> queue = ParallelModuleQueue(nprocs=3)
+ >>> mapcalc_list = []
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_1 =1")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 1
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_2 =2")
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 2
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_3 =3")
+ >>> queue.put(m) # Now it will wait until all procs finish and set the counter back to 0
+ >>> queue.get_num_run_procs()
+ 0
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
+ >>> mapcalc_list.append(new_mapcalc)
+ >>> m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
+ >>> queue.put(m)
+ >>> queue.get_num_run_procs()
+ 1
+ >>> queue.wait()
+ >>> queue.get_num_run_procs()
+ 0
+ >>> queue.get_max_num_procs()
+ 3
+ >>> for mapcalc in mapcalc_list:
+ ... print(mapcalc.popen.returncode)
+ 0
+ 0
+ 0
+ 0
+
"""
def __init__(self, nprocs=1):
"""Constructor
@@ -147,13 +255,14 @@
"""Get the number of Module processes that are in the queue running
or finished
- :returns: the maximum number fo Module processes running/finished in
- the queue
+ :returns: the number fo Module processes running/finished in the queue
"""
- return len(self._list)
+ return self._proc_count
def get_max_num_procs(self):
"""Return the maximum number of parallel Module processes
+
+ :returns: the maximum number of parallel Module processes
"""
return self._num_procs
More information about the grass-commit
mailing list