[GRASS-SVN] r66325 - in grass/trunk: lib/python/temporal temporal/t.rast.aggregate temporal/t.rast.aggregate/testsuite

svn_grass at osgeo.org svn_grass at osgeo.org
Fri Sep 25 02:19:06 PDT 2015


Author: huhabla
Date: 2015-09-25 02:19:06 -0700 (Fri, 25 Sep 2015)
New Revision: 66325

Modified:
   grass/trunk/lib/python/temporal/aggregation.py
   grass/trunk/temporal/t.rast.aggregate/t.rast.aggregate.py
   grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute.py
   grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute_parallel.py
   grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_relative.py
Log:
temporal framework: Fixed bug in t.rast.aggregate that does not wait for processes in the process queue. 
Now a warning is printed if the file limit is exceeded and the z flag is used in r.series. 
A new option was added to t.rast.aggregate to set the file limit.


Modified: grass/trunk/lib/python/temporal/aggregation.py
===================================================================
--- grass/trunk/lib/python/temporal/aggregation.py	2015-09-24 19:31:33 UTC (rev 66324)
+++ grass/trunk/lib/python/temporal/aggregation.py	2015-09-25 09:19:06 UTC (rev 66325)
@@ -182,7 +182,8 @@
 
 def aggregate_by_topology(granularity_list, granularity, map_list, topo_list,
                           basename, time_suffix, offset=0, method="average",
-                          nprocs=1, spatial=None, dbif=None, overwrite=False):
+                          nprocs=1, spatial=None, dbif=None, overwrite=False,
+                          file_limit=1000):
     """Aggregate a list of raster input maps with r.series
 
        :param granularity_list: A list of AbstractMapDataset objects.
@@ -207,6 +208,8 @@
                        east, south, north, bottom, top
        :param dbif: The database interface to be used
        :param overwrite: Overwrite existing raster maps
+       :param file_limit: The maximum number of raster map layers that
+                          should be opened at once by r.series
        :return: A list of RasterDataset objects that contain the new map names
                 and the temporal extent for map registration
     """
@@ -303,7 +306,12 @@
 
                 mod = copy.deepcopy(r_series)
                 mod(file=filename, output=output_name)
-                if len(aggregation_list) > 1000:
+                if len(aggregation_list) > int(file_limit):
+                    msgr.warning(_("The limit of open fiels (%i) was "\
+                                   "reached (%i). The module r.series will "\
+                                   "be run with flag z, to avoid open "\
+                                   "files limit exceeding."%(int(file_limit),
+                                                             len(aggregation_list))))
                     mod(flags="z")
                 process_queue.put(mod)
             else:
@@ -311,6 +319,8 @@
                 mod(raster=[aggregation_list[0],  output_name])
                 process_queue.put(mod)
 
+    process_queue.wait()
+
     if connected:
         dbif.close()
 

Modified: grass/trunk/temporal/t.rast.aggregate/t.rast.aggregate.py
===================================================================
--- grass/trunk/temporal/t.rast.aggregate/t.rast.aggregate.py	2015-09-24 19:31:33 UTC (rev 66324)
+++ grass/trunk/temporal/t.rast.aggregate/t.rast.aggregate.py	2015-09-25 09:19:06 UTC (rev 66325)
@@ -73,6 +73,15 @@
 #% answer: 1
 #%end
 
+#%option
+#% key: file_limit
+#% type: integer
+#% description: The maximum number of open files allowed for each r.series process
+#% required: no
+#% multiple: no
+#% answer: 1000
+#%end
+
 #%option G_OPT_T_SAMPLE
 #% options: equal,overlaps,overlapped,starts,started,finishes,finished,during,contains
 #% answer: contains
@@ -109,12 +118,13 @@
     sampling = options["sampling"]
     offset = options["offset"]
     nprocs = options["nprocs"]
+    file_limit = options["file_limit"]
     time_suffix = flags["s"]
-    
+
     topo_list = sampling.split(",")
 
     tgis.init()
-    
+
     dbif = tgis.SQLDatabaseInterfaceConnection()
     dbif.connect()
 
@@ -128,7 +138,7 @@
 
     # We will create the strds later, but need to check here
     tgis.check_new_stds(output, "strds",   dbif,  gcore.overwrite())
-    
+
     start_time = map_list[0].temporal_extent.get_start_time()
 
     if sp.is_time_absolute():
@@ -163,26 +173,26 @@
             end = start_time + int(gran)
             granule.set_relative_time(start, end,  sp.get_relative_time_unit())
         start_time = end
-        
+
         granularity_list.append(granule)
 
-    output_list = tgis.aggregate_by_topology(granularity_list=granularity_list,  granularity=gran,  
-                                                                       map_list=map_list,  
+    output_list = tgis.aggregate_by_topology(granularity_list=granularity_list,  granularity=gran,
+                                                                       map_list=map_list,
                                                                        topo_list=topo_list,  basename=base, time_suffix=time_suffix,
-                                                                       offset=offset,  method=method,  nprocs=nprocs,  spatial=None, 
-                                                                       overwrite=gcore.overwrite())
+                                                                       offset=offset,  method=method,  nprocs=nprocs,  spatial=None,
+                                                                       overwrite=gcore.overwrite(), file_limit=file_limit)
 
     if output_list:
         temporal_type, semantic_type, title, description = sp.get_initial_values()
         output_strds = tgis.open_new_stds(output, "strds", temporal_type,
                                                                  title, description, semantic_type,
                                                                  dbif, gcore.overwrite())
-        if register_null: 
-            register_null=False 
-        else: 
+        if register_null:
+            register_null=False
+        else:
             register_null=True
-        
-        tgis.register_map_object_list("rast", output_list,  output_strds, register_null,  
+
+        tgis.register_map_object_list("rast", output_list,  output_strds, register_null,
                                                        sp.get_relative_time_unit(),  dbif)
 
         # Update the raster metadata table entries with aggregation type

Modified: grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute.py
===================================================================
--- grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute.py	2015-09-24 19:31:33 UTC (rev 66324)
+++ grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute.py	2015-09-25 09:19:06 UTC (rev 66325)
@@ -22,42 +22,42 @@
         os.putenv("GRASS_OVERWRITE",  "1")
         tgis.init()
         cls.use_temp_region()
-        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,  
+        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,
                       t=50,  res=10,  res3=10)
-        cls.runModule("r.mapcalc", expression="a1 = 100",  overwrite=True)
-        cls.runModule("r.mapcalc", expression="a2 = 200",  overwrite=True)
-        cls.runModule("r.mapcalc", expression="a3 = 300",  overwrite=True)
-        cls.runModule("r.mapcalc", expression="a4 = 400",  overwrite=True)
-        cls.runModule("r.mapcalc", expression="a5 = 500",  overwrite=True)
-        cls.runModule("r.mapcalc", expression="a6 = 600",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a1 = 100.0",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a2 = 200.0",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a3 = 300.0",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a4 = 400.0",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a5 = 500.0",  overwrite=True)
+        cls.runModule("r.mapcalc", expression="a6 = 600.0",  overwrite=True)
         cls.runModule("r.mapcalc", expression="a7 = null()",  overwrite=True)
 
-        cls.runModule("t.create",  type="strds",  temporaltype="absolute",  
-                                    output="A",  title="A test",  
+        cls.runModule("t.create",  type="strds",  temporaltype="absolute",
+                                    output="A",  title="A test",
                                     description="A test",  overwrite=True)
 
-        cls.runModule("t.register", flags="i",  type="raster",  input="A",  
+        cls.runModule("t.register", flags="i",  type="raster",  input="A",
                                      maps="a1,a2,a3,a4,a5,a6,a7",
-                                     start="2001-01-15 12:05:45", 
-                                     increment="14 days",  
+                                     start="2001-01-15 12:05:45",
+                                     increment="14 days",
                                      overwrite=True)
     @classmethod
     def tearDownClass(cls):
         """Remove the temporary region
         """
-        cls.del_temp_region()        
+        cls.del_temp_region()
         cls.runModule("t.remove", flags="rf", type="strds", inputs="A")
 
     def tearDown(self):
-        """Remove generated data"""    
+        """Remove generated data"""
         self.runModule("t.remove", flags="rf", type="strds", inputs="B")
-        
+
     def test_disaggregation(self):
         """Disaggregation with empty maps"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
-                          basename="b", granularity="2 days", 
+                          basename="b", granularity="2 days",
                           method="average",
-                          sampling=["overlaps","overlapped","during"], 
+                          sampling=["overlaps","overlapped","during"],
                           nprocs=2, flags="n")
 
         tinfo_string="""start_time=2001-01-15 00:00:00
@@ -74,15 +74,15 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_aggregation_1month(self):
         """Aggregation one month"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="1 months",
-                          method="maximum", sampling=["contains"], 
-                          nprocs=3, flags="s")
+                          method="maximum", sampling=["contains"],
+                          file_limit=0, nprocs=3, flags="s")
 
         tinfo_string="""start_time=2001-01-01 00:00:00
                         end_time=2001-04-01 00:00:00
@@ -98,23 +98,23 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
         # Check the map names are correct
-        lister = SimpleModule("t.rast.list", input="B", columns="name", 
+        lister = SimpleModule("t.rast.list", input="B", columns="name",
                               flags="s")
         self.runModule(lister)
         #print lister.outputs.stdout
         maps="b_2001_01" + os.linesep + "b_2001_02" + os.linesep + \
              "b_2001_03" + os.linesep
         self.assertEqual(maps, lister.outputs.stdout)
-        
+
     def test_aggregation_2months(self):
         """Aggregation two month"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="2 months",
-                          method="minimum", sampling=["contains"], 
+                          method="minimum", sampling=["contains"],
                           nprocs=4, offset=10)
 
         tinfo_string="""start_time=2001-01-01 00:00:00
@@ -131,23 +131,23 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
         # Check the map names are correct
-        lister = SimpleModule("t.rast.list", input="B", columns="name", 
+        lister = SimpleModule("t.rast.list", input="B", columns="name",
                               flags="s")
         self.runModule(lister)
         #print lister.outputs.stdout
-        maps="b_11" + os.linesep + "b_12" + os.linesep 
+        maps="b_11" + os.linesep + "b_12" + os.linesep
         self.assertEqual(maps, lister.outputs.stdout)
 
     def test_aggregation_3months(self):
         """Aggregation three month"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="3 months",
-                          method="sum", sampling=["contains"], 
-                          nprocs=9, offset=100)
+                          method="sum", sampling=["contains"],
+                          file_limit=0, nprocs=9, offset=100)
 
         tinfo_string="""start_time=2001-01-01 00:00:00
                         end_time=2001-04-01 00:00:00
@@ -163,15 +163,15 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
         # Check the map names are correct
-        lister = SimpleModule("t.rast.list", input="B", columns="name", 
+        lister = SimpleModule("t.rast.list", input="B", columns="name",
                               flags="s")
         self.runModule(lister)
         #print lister.outputs.stdout
-        maps="b_101" + os.linesep 
+        maps="b_101" + os.linesep
         self.assertEqual(maps, lister.outputs.stdout)
 
 if __name__ == '__main__':

Modified: grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute_parallel.py
===================================================================
--- grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute_parallel.py	2015-09-24 19:31:33 UTC (rev 66324)
+++ grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_absolute_parallel.py	2015-09-25 09:19:06 UTC (rev 66325)
@@ -23,41 +23,41 @@
         os.putenv("GRASS_OVERWRITE",  "1")
         tgis.init()
         cls.use_temp_region()
-        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,  
+        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,
                       t=50,  res=10,  res3=10)
-                      
+
         name_list =  []
         for i in range(540):
             cls.runModule("r.mapcalc", expression="a%i = %i"%(i + 1, i + 1),  overwrite=True)
             name_list.append("a%i"%(i + 1))
 
-        cls.runModule("t.create",  type="strds",  temporaltype="absolute",  
-                                    output="A",  title="A test",  
+        cls.runModule("t.create",  type="strds",  temporaltype="absolute",
+                                    output="A",  title="A test",
                                     description="A test",  overwrite=True)
 
-        cls.runModule("t.register", flags="i",  type="raster",  input="A",  
+        cls.runModule("t.register", flags="i",  type="raster",  input="A",
                                      maps=name_list,
-                                     start="2001-01-01", 
-                                     increment="4 hours",  
+                                     start="2001-01-01",
+                                     increment="4 hours",
                                      overwrite=True)
 
     @classmethod
     def tearDownClass(cls):
         """Remove the temporary region
         """
-        cls.del_temp_region()        
+        cls.del_temp_region()
         cls.runModule("t.remove", flags="rf", type="strds", inputs="A")
 
     def tearDown(self):
-        """Remove generated data"""    
+        """Remove generated data"""
         self.runModule("t.remove", flags="rf", type="strds", inputs="B")
 
     def test_aggregation_12hours(self):
         """Aggregation one month"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="12 hours",
-                          method="sum", sampling=["contains"], 
-                          nprocs=9, flags="s")
+                          method="sum", sampling=["contains"],
+                          nprocs=9, flags="s", file_limit=2)
 
         tinfo_string="""start_time=2001-01-01 00:00:00
                         end_time=2001-04-01 00:00:00
@@ -73,7 +73,7 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_aggregation_1day_4procs(self):
@@ -81,10 +81,10 @@
         start = datetime.now()
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="1 day",
-                          method="sum", sampling=["contains"], 
+                          method="sum", sampling=["contains"],
                           nprocs=4, flags="s")
         end = datetime.now()
-        
+
         delta = end - start
         print "test_aggregation_1day_4procs:",  delta.total_seconds()
 
@@ -98,7 +98,7 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_aggregation_1day_3procs(self):
@@ -106,10 +106,10 @@
         start = datetime.now()
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="1 day",
-                          method="sum", sampling=["contains"], 
+                          method="sum", sampling=["contains"],
                           nprocs=3, flags="s")
         end = datetime.now()
-        
+
         delta = end - start
         print "test_aggregation_1day_3procs:",  delta.total_seconds()
 
@@ -128,7 +128,7 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_aggregation_1day_2procs(self):
@@ -136,10 +136,10 @@
         start = datetime.now()
         self.assertModule("t.rast.aggregate", input="A", output="B",
                           basename="b", granularity="1 day",
-                          method="sum", sampling=["contains"], 
+                          method="sum", sampling=["contains"],
                           nprocs=2, flags="s")
         end = datetime.now()
-        
+
         delta = end - start
         print "test_aggregation_1day_2procs:",  delta.total_seconds()
 
@@ -158,7 +158,7 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 if __name__ == '__main__':
     from grass.gunittest.main import test

Modified: grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_relative.py
===================================================================
--- grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_relative.py	2015-09-24 19:31:33 UTC (rev 66324)
+++ grass/trunk/temporal/t.rast.aggregate/testsuite/test_aggregation_relative.py	2015-09-25 09:19:06 UTC (rev 66325)
@@ -22,7 +22,7 @@
         os.putenv("GRASS_OVERWRITE",  "1")
         tgis.init()
         cls.use_temp_region()
-        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,  
+        cls.runModule("g.region",  s=0,  n=80,  w=0,  e=120,  b=0,
                       t=50,  res=10,  res3=10)
         cls.runModule("r.mapcalc", expression="a1 = 100",  overwrite=True)
         cls.runModule("r.mapcalc", expression="a2 = 200",  overwrite=True)
@@ -32,32 +32,32 @@
         cls.runModule("r.mapcalc", expression="a6 = 600",  overwrite=True)
         cls.runModule("r.mapcalc", expression="a7 = null()",  overwrite=True)
 
-        cls.runModule("t.create",  type="strds",  temporaltype="relative",  
-                                    output="A",  title="A test",  
+        cls.runModule("t.create",  type="strds",  temporaltype="relative",
+                                    output="A",  title="A test",
                                     description="A test",  overwrite=True)
 
-        cls.runModule("t.register", flags="i",  type="raster",  input="A",  
+        cls.runModule("t.register", flags="i",  type="raster",  input="A",
                                      maps="a1,a2,a3,a4,a5,a6,a7",
-                                     start=0, unit="days", increment=3, 
+                                     start=0, unit="days", increment=3,
                                      overwrite=True)
     @classmethod
     def tearDownClass(cls):
         """Remove the temporary region
         """
-        cls.del_temp_region()        
+        cls.del_temp_region()
         cls.runModule("t.remove", flags="rf", type="strds", inputs="A")
 
     def tearDown(self):
-        """Remove generated data"""    
+        """Remove generated data"""
         self.runModule("t.remove", flags="rf", type="strds", inputs="B")
 
     def test_1(self):
         """Simple test"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
-                          basename="b", granularity=6, 
-                          method="average",
-                          sampling=["overlaps","overlapped","contains"], 
-                          nprocs=2)
+                          basename="b", granularity=6,
+                          method="average", file_limit=0,
+                          sampling=["overlaps","overlapped","contains"],
+                          nprocs=2, verbose=True)
 
         tinfo_string="""start_time=0
                         end_time=18
@@ -74,16 +74,16 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_2(self):
         """Simple test register null maps"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
-                          basename="b", granularity=9, 
+                          basename="b", granularity=9,
                           method="maximum",
-                          sampling=["contains"], 
-                          nprocs=4, flags="n")
+                          sampling=["contains"],
+                          nprocs=4, flags="n", verbose=True)
 
         tinfo_string="""semantic_type=mean
                         start_time=0
@@ -101,16 +101,16 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_3(self):
         """Simple test"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
-                          basename="b", granularity=9, 
+                          basename="b", granularity=9,
                           method="maximum",
-                          sampling=["contains"], 
-                          nprocs=4)
+                          sampling=["contains"],
+                          nprocs=4, verbose=True)
 
         tinfo_string="""semantic_type=mean
                         start_time=0
@@ -128,16 +128,16 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
     def test_4(self):
         """Simple test"""
         self.assertModule("t.rast.aggregate", input="A", output="B",
-                          basename="b", granularity=21, 
+                          basename="b", granularity=21,
                           method="average",
                           sampling=["contains"],
-                          nprocs=4)
+                          nprocs=4, verbose=True)
 
         tinfo_string="""semantic_type=mean
                         start_time=0
@@ -155,7 +155,7 @@
         info = SimpleModule("t.info", flags="g", input="B")
         #info.run()
         #print info.outputs.stdout
-        self.assertModuleKeyValue(module=info, reference=tinfo_string, 
+        self.assertModuleKeyValue(module=info, reference=tinfo_string,
                                   precision=2, sep="=")
 
 if __name__ == '__main__':



More information about the grass-commit mailing list