[GRASS-SVN] r73076 - in sandbox/wenzeslaus: . r.neighbors.mp
svn_grass at osgeo.org
svn_grass at osgeo.org
Sat Aug 11 19:33:14 PDT 2018
Author: wenzeslaus
Date: 2018-08-11 19:33:14 -0700 (Sat, 11 Aug 2018)
New Revision: 73076
Added:
sandbox/wenzeslaus/r.neighbors.mp/
sandbox/wenzeslaus/r.neighbors.mp/Makefile
sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.html
sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.py
Log:
testing code for TiledWorkflow parallelization approach
Added: sandbox/wenzeslaus/r.neighbors.mp/Makefile
===================================================================
--- sandbox/wenzeslaus/r.neighbors.mp/Makefile (rev 0)
+++ sandbox/wenzeslaus/r.neighbors.mp/Makefile 2018-08-12 02:33:14 UTC (rev 73076)
@@ -0,0 +1,7 @@
+MODULE_TOPDIR = ../..
+
+PGM = r.neighbors.mp
+
+include $(MODULE_TOPDIR)/include/Make/Script.make
+
+default: script
Property changes on: sandbox/wenzeslaus/r.neighbors.mp/Makefile
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+text/x-makefile
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.html
===================================================================
--- sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.html (rev 0)
+++ sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.html 2018-08-12 02:33:14 UTC (rev 73076)
@@ -0,0 +1,9 @@
+Test module for TiledWorkflow parallelization approach.
+
+Has several levels of interface. Can be customized to do more.
+Main interface idea is that it looks like grass.script.
+
+Developed in 2016, likely superseded by combination of MultiModule and
+ParallelModuleQueue. Interface should be polished.
+Very slow due to patching ("untiling"). Slower for single module calls
+of a fast module like r.neighbors.
Property changes on: sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.html
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+text/html
\ No newline at end of property
Added: svn:keywords
## -0,0 +1 ##
+Author Date Id
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Added: sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.py
===================================================================
--- sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.py (rev 0)
+++ sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.py 2018-08-12 02:33:14 UTC (rev 73076)
@@ -0,0 +1,570 @@
+#!/usr/bin/env python
+############################################################################
+#
+# MODULE: r.neighbors.mp
+# AUTHOR(S): Vaclav Petras
+# PURPOSE: Wrapper for r.neighbors for experimenting with parallelization
+# COPYRIGHT: (C) 2018 by Vaclav Petras, and the GRASS Development Team
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+############################################################################
+
+#%module
+#% description: Makes each cell category value a function of the category values assigned to the cells around it, and stores new cell values in an output raster map layer.
+#% keyword: raster
+#% keyword: algebra
+#% keyword: statistics
+#% keyword: aggregation
+#% keyword: neighbor
+#% keyword: focal statistics
+#% keyword: filter
+#%end
+#%flag
+#% key: a
+#% description: Do not align output with the input
+#%end
+#%flag
+#% key: c
+#% description: Use circular neighborhood
+#% guisection: Neighborhood
+#%end
+#%option G_OPT_R_INPUT
+#%end
+#%option G_OPT_R_INPUT
+#% key: selection
+#% required: no
+#% description: Name of an input raster map to select the cells which should be processed
+#%end
+#%option G_OPT_R_OUTPUT
+#% multiple: yes
+#%end
+#%option
+#% key: method
+#% type: string
+#% required: no
+#% multiple: yes
+#% options: average,median,mode,minimum,maximum,range,stddev,sum,count,variance,diversity,interspersion,quart1,quart3,perc90,quantile
+#% description: Neighborhood operation
+#% answer: average
+#% guisection: Neighborhood
+#%end
+#%option
+#% key: size
+#% type: integer
+#% required: no
+#% multiple: no
+#% description: Neighborhood size
+#% answer: 3
+#% guisection: Neighborhood
+#%end
+#%option
+#% key: title
+#% type: string
+#% required: no
+#% multiple: no
+#% key_desc: phrase
+#% description: Title for output raster map
+#%end
+#%option G_OPT_F_INPUT
+#% key: weight
+#% required: no
+#% description: Text file containing weights
+#%end
+#%option
+#% key: gauss
+#% type: double
+#% required: no
+#% multiple: no
+#% description: Sigma (in cells) for Gaussian filter
+#%end
+#%option
+#% key: quantile
+#% type: double
+#% required: no
+#% multiple: yes
+#% options: 0.0-1.0
+#% description: Quantile to calculate for method=quantile
+#%end
+#%option
+#% key: nprocs
+#% type: integer
+#% required: yes
+#% description: Number of processes for parallel computing
+#% guisection: Parallel
+#%end
+#%option
+#% key: width
+#% type: integer
+#% required: yes
+#% description: Width of a tile in cells
+#% guisection: Parallel
+#%end
+#%option
+#% key: height
+#% type: integer
+#% required: yes
+#% description: Height of a tile in cells
+#% guisection: Parallel
+#%end
+#%option
+#% key: overlap
+#% type: integer
+#% required: yes
+#% description: Overlap of tile in cells
+#% guisection: Parallel
+#%end
+
+
+import os
+import sys
+import copy
+from multiprocessing import Pool
+import atexit
+import uuid
+
+from collections import namedtuple
+
+import grass.script as gs
+from grass.exceptions import CalledModuleError
+
+
+def remove_keys_from_dict(dictionary, keys):
+ for key in keys:
+ del dictionary[key]
+
+
+def remove_false_values():
+ return {key: value for key, value in dictionary.items() if value}
+
+
+def flags_dict_to_str(flags)
+ return "".join([value for key, value in flags.items() if value])
+
+
+Region = namedtuple(
+ 'Region', 'west, east, north, south, rows, cols, ewres, nsres')
+
+
+def get_current_region():
+ reg = gs.parse_command('g.region', flags='g3')
+ return Region(west=float(reg['w']), east=float(reg['e']),
+ north=float(reg['n']), south=float(reg['s']),
+ rows=int(reg['rows']), cols=int(reg['cols']),
+ ewres=float(reg['ewres']), nsres=float(reg['nsres']))
+
+
+def get_tempory_region(do_cleanup=True):
+ """Copies the current region to a temporary region with "g.region save=",
+ then returns name of that region. Installs an atexit
+ handler to delete the temporary region upon termination by default.
+ """
+ name = "tmp.%s.%s" % (os.path.basename(sys.argv[0]),
+ str(uuid.uuid4()).replace('-', ''))
+ gs.run_command('g.region', save=name, overwrite=True)
+ if do_cleanup:
+ atexit.register(delete_temporary_region, name)
+ return name
+
+
+def delete_temporary_region(name):
+ """Removes any region named by it."""
+ try:
+ gs.run_command('g.remove', type='region', name=name,
+ flags='f', quiet=True)
+ except CalledModuleError:
+ pass
+
+
+BBox = namedtuple('BBox', 'west, east, north, south')
+
+
+def region_to_tiles(width, height, overlap=0, region=None):
+ """
+
+ >>> reg = Region()
+ >>> reg.north = 1000
+ >>> reg.south = 0
+ >>> reg.nsres = 1
+ >>> reg.east = 2000
+ >>> reg.west = 0
+ >>> reg.ewres = 1
+ >>> reg.cols
+ 2000
+ >>> reg.rows
+ 1000
+ >>> tiles = region_to_tiles(500, 500, 10, reg)
+ >>> len(tiles)
+ 8
+ """
+ lists = split_region_tiles(region=region, width=width,
+ height=height, overlap=overlap)
+ return [item for sublist in lists for item in sublist]
+
+
+# copy of from grass.pygrass.modules.grid.split import split_region_tiles get_bbox
+def get_bbox(reg, row, col, width, height, overlap):
+ """Return a Bbox
+
+ :param reg: a Region object to split
+ :type reg: Region object
+ :param row: the number of row
+ :type row: int
+ :param col: the number of row
+ :type col: int
+ :param width: the width of tiles
+ :type width: int
+ :param height: the width of tiles
+ :type height: int
+ :param overlap: the value of overlap between tiles
+ :type overlap: int
+ """
+ north = reg.north - (row * height - overlap) * reg.nsres
+ south = reg.north - ((row + 1) * height + overlap) * reg.nsres
+ east = reg.west + ((col + 1) * width + overlap) * reg.ewres
+ west = reg.west + (col * width - overlap) * reg.ewres
+ return BBox(north=north if north <= reg.north else reg.north,
+ south=south if south >= reg.south else reg.south,
+ east=east if east <= reg.east else reg.east,
+ west=west if west >= reg.west else reg.west,)
+
+
+# copy of from grass.pygrass.modules.grid.split import split_region_tiles
+def split_region_tiles(region, width=100, height=100, overlap=0):
+ """Spit a region into a list of bounding boxes"""
+ ncols = (region.cols + width - 1) // width
+ nrows = (region.rows + height - 1) // height
+ box_list = []
+ for row in range(nrows):
+ row_list = []
+ for col in range(ncols):
+ #print 'c', c, 'r', r
+ row_list.append(get_bbox(region, row, col, width, height, overlap))
+ box_list.append(row_list)
+ return box_list
+
+
+def dict_parse_command(*args, **kwargs):
+ return dict(gs.parse_command(*args, **kwargs))
+
+
+# this is basically a list of functors which has a special way of adding
+# more items to it
+class ModuleCallList(list):
+
+ def __init__(self, bbox=None):
+ super(ModuleCallList, self).__init__()
+ self.env = None
+ if bbox:
+ self.bbox = bbox
+ self.env = os.environ.copy()
+ self.env['WIND_OVERRIDE'] = get_tempory_region()
+ bbox = self.bbox
+ gs.run_command('g.region', w=bbox.west, e=bbox.east,
+ n=bbox.north, s=bbox.south, env=self.env)
+
+ def run_command(self, *args, **kwargs):
+ # TODO: what should happen when env is provided?
+ if 'env' not in kwargs:
+ kwargs['env'] = self.env
+ self.append(ModuleCall(gs.run_command, *args, **kwargs))
+
+ def read_command(self, *args, **kwargs):
+ if 'env' not in kwargs:
+ kwargs['env'] = self.env
+ self.append(ModuleCall(gs.read_command, *args, **kwargs))
+
+ def parse_command(self, *args, **kwargs):
+ if 'env' not in kwargs:
+ kwargs['env'] = self.env
+ # TODO: only normal dict can get pickled, not the GRASS KeyVal
+ self.append(ModuleCall(dict_parse_command, *args, **kwargs))
+
+ def mapcalc(self, *args, **kwargs):
+ if 'env' not in kwargs:
+ kwargs['env'] = self.env
+ self.append(ModuleCall(gs.mapcalc, *args, **kwargs))
+
+ def mapcalc3d(self, *args, **kwargs):
+ if 'env' not in kwargs:
+ kwargs['env'] = self.env
+ self.append(ModuleCall(gs.mapcalc3d, *args, **kwargs))
+
+ def execute(self):
+ """Execute all queued processes.
+
+ Return whatever was the last return value
+ """
+ ret = None
+ for call in self:
+ ret = call.execute()
+ return ret
+
+
+# this is basically a functor
+class ModuleCall(object):
+ def __init__(self, function, *args, **kwargs):
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+
+ def execute(self):
+ """Execute all associated process.
+
+ Returns the original return value
+ """
+ return self.function(*(self.args), **(self.kwargs))
+
+
+# TODO: do we need the functions twice?
+def execute_module_call_list(module_list):
+ try:
+ return module_list.execute()
+ except (KeyboardInterrupt, CalledModuleError):
+ return
+
+
+def execute_module_call(call):
+ try:
+ return call.execute()
+ except (KeyboardInterrupt, CalledModuleError):
+ return
+
+
+# TODO: introduce better naming for modules and functions
+# TODO: introduce debug, dry_run and no_clean_tmp wherever appropriate
+def execute_list(module_lists, nprocs, debug=False):
+ if debug:
+ result = []
+ for module_list in module_lists:
+ result.append(module_list.execute())
+ return result
+ pool = Pool(nprocs)
+ proc = pool.map_async(execute_module_call_list, module_lists)
+ # TODO: error handling should be improved
+ try:
+ return proc.get()
+ except (KeyboardInterrupt, CalledModuleError):
+ # TODO: this should be done in a better way
+ sys.exit("KeyboardInterrupt, CalledModuleError")
+
+
+# TODO: errors lead to strange behavior, is legal to wrap map_async like this?
+def execute_function_on_list(module_lists, function, nprocs, debug=False):
+ if debug:
+ result = []
+ for module_list in module_lists:
+ result.append(function(module_list))
+ return result
+ pool = Pool(nprocs)
+ proc = pool.map_async(function, module_lists)
+ # TODO: error handling should be improved
+ try:
+ return proc.get()
+ except (KeyboardInterrupt, CalledModuleError):
+ # TODO: this should be done in a better way
+ sys.exit("KeyboardInterrupt, CalledModuleError")
+
+
+def execute_by_module(modules, nprocs):
+ pool = Pool(nprocs)
+ proc = pool.map_async(execute_module_call, modules)
+ try:
+ return proc.get()
+ except (KeyboardInterrupt, CalledModuleError):
+ # TODO: this should be done in a better way
+ sys.exit("KeyboardInterrupt, CalledModuleError")
+
+
+def generate_patch_expression(rasters, bboxes):
+ exprs = []
+ expr = ("if(x() >= {b.west} && x() <= {b.east} "
+ "&& y() >= {b.south} && y() <= {b.north}, {m},")
+ for raster, bbox in zip(rasters, bboxes):
+ exprs.append(expr.format(b=bbox, m=raster))
+ return " ".join(exprs) + " null()" + ")" * len(exprs)
+
+# TODO: patch looses color table
+
+def patch_rasters(rasters, bboxes, output):
+ expression = generate_patch_expression(rasters, bboxes)
+ gs.mapcalc("%s = %s" % (output, expression))
+
+
+def patch_rasters3d(rasters, bboxes, output):
+ expression = generate_patch_expression(rasters, bboxes)
+ gs.mapcalc3d("%s = %s" % (output, expression))
+
+
+def patch_rasters_2(args):
+ patch_rasters(*args)
+
+
+def patch_rasters3d_2(args):
+ patch_rasters3d(*args)
+
+
+def remove_rasters(maps):
+ gs.run_command('g.remove', type='raster', name=maps,
+ flags='f', quiet=True)
+
+
+def remove_rasters3d(maps):
+ gs.run_command('g.remove', type='raster3d', name=maps,
+ flags='f', quiet=True)
+
+
+class Namer(object):
+ def __init__(self, bbox, number, callback):
+ self.bbox = bbox
+ self.number = number # TODO: this is a propertyand needs a better name
+ self.callback = callback
+
+ def name(self, type, name, patch=True):
+ tile_name = "%s_%d" % (name, self.number)
+ self.callback(type, name, tile_name, self.bbox)
+ return tile_name
+
+
+# this idea for the class is that it is using the general functions
+class TiledWorkflow(object):
+
+ def __init__(self, nprocs, width, height, overlap=0, region=None, debug=False):
+ self.nprocs = nprocs
+ self.region = get_current_region()
+ self.bboxes = region_to_tiles(
+ region=self.region, width=width, height=height,
+ overlap=overlap)
+ self.bboxes_no_overlap = region_to_tiles(
+ region=self.region, width=width, height=height)
+ self.lists_of_rasters = [] # tuple(list, list, str)
+ self.lists_of_rasters3d = [] # tuple(list, list, str)
+ self.elements_to_patch = {}
+ self.calls = []
+ self.debug = debug
+ self.iteration_index = 0
+ self.iteration_max = len(self.bboxes)
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.iteration_index >= self.iteration_max:
+ raise StopIteration
+ else:
+ bbox = self.bboxes[self.iteration_index]
+ bbox_no_overlap = self.bboxes_no_overlap[self.iteration_index]
+ namer = Namer(bbox=bbox_no_overlap, number=self.iteration_index,
+ callback=self.add_element_to_patch)
+ call = ModuleCallList(bbox=bbox)
+ self.calls.append(call)
+ self.iteration_index += 1
+ return namer, call
+
+ def execute_tiled(self):
+ return execute_list(self.calls, nprocs=self.nprocs, debug=self.debug)
+
+ def add_rasters_to_patch(self, inputs, output):
+ if len(self.bboxes) != len(inputs):
+ raise ValueError("Number of items to patch must be the same"
+ " as the number of bounding boxes")
+ self.lists_of_rasters.append((inputs, self.bboxes_no_overlap, output))
+
+ def add_rasters3d_to_patch(self, inputs, output):
+ if len(self.bboxes) != len(inputs):
+ raise ValueError("Number of items to patch must be the same"
+ " as the number of bounding boxes")
+ self.lists_of_rasters3d.append((inputs, self.bboxes_no_overlap, output))
+
+ def add_element_to_patch(self, type, name, tile_name, bbox):
+ if not type in self.elements_to_patch:
+ self.elements_to_patch[type] = {}
+ if not name in self.elements_to_patch[type]:
+ self.elements_to_patch[type][name] = {'tiles': [], 'bboxes': []}
+ self.elements_to_patch[type][name]['tiles'].append(tile_name)
+ self.elements_to_patch[type][name]['bboxes'].append(bbox)
+
+ def patch(self):
+ for type, names in self.elements_to_patch.iteritems():
+ if type == 'raster':
+ for name, info in names.iteritems():
+ self.lists_of_rasters.append((info['tiles'], info['bboxes'], name))
+
+ execute_function_on_list(self.lists_of_rasters, function=patch_rasters_2, nprocs=self.nprocs, debug=self.debug)
+ execute_function_on_list(self.lists_of_rasters3d, function=patch_rasters3d_2, nprocs=self.nprocs, debug=self.debug)
+ if self.debug:
+ patch_rasters(range(0, len(self.bboxes_no_overlap)), self.bboxes_no_overlap, 'tiles_no_overlaps')
+ patch_rasters(range(0, len(self.bboxes)), self.bboxes, 'tiles_with_overlaps')
+ # TODO: parallelize
+ # TODO: perhaps special option for temp files, or even preserve tiles
+ if not self.debug:
+ for rasters, bbox, output in self.lists_of_rasters:
+ remove_rasters(rasters)
+ for rasters, bbox, output in self.lists_of_rasters3d:
+ remove_rasters3d(rasters)
+
+ def execute(self):
+ result = self.execute_tiled()
+ self.patch()
+ return result
+
+
+def main(options, flags):
+ input = options['input']
+ outputs = options['output']
+ nprocs = int(options['nprocs'])
+ width = int(options['width'])
+ height = int(options['height'])
+ overlap = int(options['overlap'])
+
+ # TODO: theoretically we could also just take nprocs
+ # and do some guesses for the rest (including overlap)
+ # TODO: check existence of input and output beforehand
+
+ # shallow copy to just pass the remaining options
+ neighbors_options = copy.copy(options)
+ # remove options for this module
+ this_module_opts = [
+ 'input', 'output', 'nprocs', 'width', 'height', 'overlap']
+ remove_key_from_dict(neighbors_options, this_module_opts)
+ # pass only options which are set
+ neighbors_options = remove_false_values(neighbors_options)
+
+ # pass active flags to the module
+ neighbors_flags = flags_dict_to_str(flags)
+
+ # TODO: Align output with the input (opposite of -a) is not
+ # supported, probably a g.region call needed here
+
+ tiled_workflow = TiledWorkflow(nprocs=nprocs, width=width,
+ height=height, overlap=overlap)
+
+ gs.message(_("Splitting processing into {} tiles").format(
+ len(tiled_workflow.bboxes)))
+
+ for namer, workflow in tiled_workflow:
+ # get name for each of the (potential) outputs
+ outs = []
+ for output in outputs.split(','):
+ outs.append(namer.name('raster', output))
+
+ workflow.run_command('r.neighbors', input=input,
+ output=outs, quiet=True,
+ flags=neighbors_flags, **neighbors_options)
+
+ results = tiled_workflow.execute()
+
+ # TODO: title for output not supported
+ # TODO: proper history not provided
+
+
+if __name__ == "__main__":
+ sys.exit(main(*gs.parser()))
Property changes on: sandbox/wenzeslaus/r.neighbors.mp/r.neighbors.mp.py
___________________________________________________________________
Added: svn:mime-type
## -0,0 +1 ##
+text/x-python
\ No newline at end of property
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
More information about the grass-commit
mailing list