[GRASS-SVN] r64526 - in grass/trunk/lib/python/temporal: . testsuite

svn_grass at osgeo.org svn_grass at osgeo.org
Mon Feb 9 04:44:14 PST 2015


Author: huhabla
Date: 2015-02-09 04:44:14 -0800 (Mon, 09 Feb 2015)
New Revision: 64526

Modified:
   grass/trunk/lib/python/temporal/c_libraries_interface.py
   grass/trunk/lib/python/temporal/core.py
   grass/trunk/lib/python/temporal/testsuite/test_doctests.py
Log:
temporal library: Using a thread to observe the state of the libgis server process, since the fatal_error SIGABRT signal can not be catched in Python and the Python error handler function can not be registered in the libgis error handling system.


Modified: grass/trunk/lib/python/temporal/c_libraries_interface.py
===================================================================
--- grass/trunk/lib/python/temporal/c_libraries_interface.py	2015-02-09 11:12:36 UTC (rev 64525)
+++ grass/trunk/lib/python/temporal/c_libraries_interface.py	2015-02-09 12:44:14 UTC (rev 64526)
@@ -11,6 +11,9 @@
 :authors: Soeren Gebbert
 """
 
+from grass.exceptions import FatalError
+import time
+import threading
 import sys
 from multiprocessing import Process, Lock, Pipe
 import logging
@@ -723,7 +726,6 @@
 
 ###############################################################################
 
-
 def _stop(lock, conn, data):
     libgis.G_debug(1, "Stop C-interface server")
     conn.close()
@@ -735,6 +737,8 @@
 server_connection = None
 server_lock = None
 
+#def error_handler(data):
+#    server_connection.close()
 
 def c_library_server(lock, conn):
     """The GRASS C-libraries server function designed to be a target for
@@ -743,6 +747,17 @@
        :param lock: A multiprocessing.Lock
        :param conn: A multiprocessing.Pipe
     """
+    #global server_connection
+    #server_connection = conn
+ 
+    #CALLBACK = CFUNCTYPE(None, c_void_p)
+    #CALLBACK.restype = None
+    #CALLBACK.argtypes = c_void_p
+
+    #cerror_handler = CALLBACK(error_handler)
+    
+    #libgis.G_add_error_handler(cerror_handler, POINTER(None))
+
     # Crerate the function array
     functions = [0]*15
     functions[RPCDefs.STOP] = _stop
@@ -765,13 +780,12 @@
 
     while True:
         # Avoid busy waiting
-        conn.poll(4)
+        conn.poll(None)
         data = conn.recv()
         lock.acquire()
         functions[data[0]](lock, conn, data)
         lock.release()
 
-
 class CLibrariesInterface(object):
     """Fast and exit-safe interface to GRASS C-libraries functions
 
@@ -897,7 +911,31 @@
            >>> mapset = ciface.get_mapset()
            >>> location = ciface.get_location()
            >>> gisdbase = ciface.get_gisdbase()
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+           
+           >>> mapset = ciface.get_mapset()
+           >>> location = ciface.get_location()
+           >>> gisdbase = ciface.get_gisdbase()
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
 
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+
            >>> gscript.del_temp_region()
 
     """
@@ -906,8 +944,40 @@
         self.server_conn = None
         self.queue = None
         self.server = None
+        self.checkThread = None
+        self.threadLock = threading.Lock()
         self.start_server()
+        self.start_checker_thread()
+        self.stopThread = False
+        
+    def start_checker_thread(self):
+        if self.checkThread is not None and self.checkThread.is_alive():
+            self.stop_checker_thread()
 
+        self.checkThread = threading.Thread(target=self.thread_checker)
+        self.checkThread.daemon = True
+        self.stopThread = False
+        self.checkThread.start()
+    
+    def stop_checker_thread(self):
+        self.threadLock.acquire()
+        self.stopThread = True
+        self.threadLock.release()
+        self.checkThread.join(None)
+        
+    def thread_checker(self):
+        """Check every 200 micro seconds if the server process is alive"""
+        while True:
+            time.sleep(0.2)
+            #sys.stderr.write("Check server process\n")
+            self._check_restart_server()
+            self.threadLock.acquire()
+            if self.stopThread == True:
+                #sys.stderr.write("Stop thread\n")
+                self.threadLock.release()
+                return
+            self.threadLock.release()
+
     def start_server(self):
         self.client_conn, self.server_conn = Pipe(True)
         self.lock = Lock()
@@ -916,16 +986,25 @@
         self.server.daemon = True
         self.server.start()
 
+    def check_server(self):
+        self._check_restart_server()
+    
     def _check_restart_server(self):
         """Restart the server if it was terminated
         """
+        self.threadLock.acquire()
+        
         if self.server.is_alive() is True:
+            self.threadLock.release()
             return
         self.client_conn.close()
         self.server_conn.close()
         self.start_server()
+
         logging.warning("Needed to restart the libgis server")
 
+        self.threadLock.release()
+
     def raster_map_exists(self, name, mapset):
         """Check if a raster map exists in the spatial database
 
@@ -933,10 +1012,10 @@
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("raster_map_exists")
 
     def read_raster_info(self, name, mapset):
         """Read the raster map info from the file system and store the content
@@ -947,10 +1026,10 @@
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster_info")
 
     def has_raster_timestamp(self, name, mapset):
         """Check if a file based raster timetamp exists
@@ -959,10 +1038,10 @@
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("has_raster_timestamp")
 
     def remove_raster_timestamp(self, name, mapset):
         """Remove a file based raster timetamp
@@ -974,10 +1053,10 @@
            :param mapset: The mapset of the map
            :returns: The return value of G_remove_raster_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_raster_timestamp")
 
     def read_raster_timestamp(self, name, mapset):
         """Read a file based raster timetamp
@@ -996,10 +1075,10 @@
            :param mapset: The mapset of the map
            :returns: The return value of G_read_raster_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster_timestamp")
 
     def write_raster_timestamp(self, name, mapset, timestring):
         """Write a file based raster timetamp
@@ -1015,10 +1094,10 @@
            :param timestring: A GRASS datetime C-library compatible string
            :returns: The return value of G_write_raster_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_raster_timestamp")
 
     def raster3d_map_exists(self, name, mapset):
         """Check if a 3D raster map exists in the spatial database
@@ -1027,10 +1106,10 @@
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("raster3d_map_exists")
 
     def read_raster3d_info(self, name, mapset):
         """Read the 3D raster map info from the file system and store the content
@@ -1041,10 +1120,10 @@
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster3d_info")
 
     def has_raster3d_timestamp(self, name, mapset):
         """Check if a file based 3D raster timetamp exists
@@ -1053,10 +1132,10 @@
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("has_raster3d_timestamp")
 
     def remove_raster3d_timestamp(self, name, mapset):
         """Remove a file based 3D raster timetamp
@@ -1068,10 +1147,10 @@
            :param mapset: The mapset of the map
            :returns: The return value of G_remove_raster3d_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_raster3d_timestamp")
 
     def read_raster3d_timestamp(self, name, mapset):
         """Read a file based 3D raster timetamp
@@ -1090,10 +1169,10 @@
            :param mapset: The mapset of the map
            :returns: The return value of G_read_raster3d_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster3d_timestamp")
 
     def write_raster3d_timestamp(self, name, mapset, timestring):
         """Write a file based 3D raster timetamp
@@ -1109,10 +1188,10 @@
            :param timestring: A GRASS datetime C-library compatible string
            :returns: The return value of G_write_raster3d_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_raster3d_timestamp")
 
     def vector_map_exists(self, name, mapset):
         """Check if a vector map exists in the spatial database
@@ -1121,10 +1200,10 @@
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_VECTOR,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("vector_map_exists")
 
     def read_vector_info(self, name, mapset):
         """Read the vector map info from the file system and store the content
@@ -1135,10 +1214,10 @@
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_VECTOR,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_vector_info")
 
     def has_vector_timestamp(self, name, mapset, layer=None):
         """Check if a file based vector timetamp exists
@@ -1148,10 +1227,10 @@
            :param layer: The layer of the vector map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("has_vector_timestamp")
 
     def remove_vector_timestamp(self, name, mapset, layer=None):
         """Remove a file based vector timetamp
@@ -1164,10 +1243,10 @@
            :param layer: The layer of the vector map
            :returns: The return value of G_remove_vector_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_vector_timestamp")
 
     def read_vector_timestamp(self, name, mapset, layer=None):
         """Read a file based vector timetamp
@@ -1187,10 +1266,10 @@
            :param layer: The layer of the vector map
            :returns: The return value ofG_read_vector_timestamp and the timestamps
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("read_vector_timestamp")
 
     def write_vector_timestamp(self, name, mapset, timestring, layer=None):
         """Write a file based vector timestamp
@@ -1207,19 +1286,19 @@
            :param layer: The layer of the vector map
            :returns: The return value of G_write_vector_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_vector_timestamp")
 
     def available_mapsets(self):
         """Return all available mapsets the user can access as a list of strings
 
            :returns: Names of available mapsets as list of strings
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.AVAILABLE_MAPSETS, ])
-        return self.client_conn.recv()
+        return self.safe_receive("available_mapsets")
 
     def get_driver_name(self, mapset=None):
         """Return the temporal database driver of a specific mapset
@@ -1228,9 +1307,9 @@
 
            :returns: Name of the driver or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.GET_DRIVER_NAME, mapset])
-        return self.client_conn.recv()
+        return self.safe_receive("get_driver_name")
 
     def get_database_name(self, mapset=None):
         """Return the temporal database name of a specific mapset
@@ -1239,36 +1318,36 @@
 
            :returns: Name of the database or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.GET_DATABASE_NAME, mapset])
-        return self.client_conn.recv()
+        return self.safe_receive("get_database_name")
 
     def get_mapset(self):
         """Return the current mapset
 
            :returns: Name of the current mapset
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_MAPSET, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_mapsetn")
 
     def get_location(self):
         """Return the location
 
            :returns: Name of the location
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_LOCATION, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_location")
 
     def get_gisdbase(self):
         """Return the gisdatabase
 
            :returns: Name of the gisdatabase
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_GISDBASE, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_gisdbase")
 
     def fatal_error(self, mapset=None):
         """Return the temporal database name of a specific mapset
@@ -1277,21 +1356,35 @@
 
            :returns: Name of the database or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_FATAL_ERROR])
+        # The pipe should be closed in the checker thread
+        return self.safe_receive("Fatal error")
 
+    def safe_receive(self, message):
+        """Receive the data and throw an FatalError exception in case the server 
+           process was killed and the pipe was closed by the checker thread"""
+        try:
+            return self.client_conn.recv()
+        except EOFError:
+            # The pipe was closed by the checker thread because
+            # the server process was killed
+            raise FatalError(message)
+
     def stop(self):
-        """Stop the messenger server and close the pipe
+        """Stop the check thread, the libgis server and close the pipe
 
            This method should be called at exit using the package atexit
         """
+        #sys.stderr.write("###### Stop was called\n")
+        self.stop_checker_thread()
         if self.server is not None and self.server.is_alive():
             self.client_conn.send([0, ])
-            self.server.join(5)
-            self.server.terminate()
+            self.server.join()
         if self.client_conn is not None:
             self.client_conn.close()
 
 if __name__ == "__main__":
     import doctest
     doctest.testmod()
+

Modified: grass/trunk/lib/python/temporal/core.py
===================================================================
--- grass/trunk/lib/python/temporal/core.py	2015-02-09 11:12:36 UTC (rev 64525)
+++ grass/trunk/lib/python/temporal/core.py	2015-02-09 12:44:14 UTC (rev 64526)
@@ -36,6 +36,7 @@
 import grass.script as gscript
 from datetime import datetime
 from c_libraries_interface import *
+from grass.pygrass import messages
 # Import all supported database backends
 # Ignore import errors since they are checked later
 try:
@@ -238,8 +239,8 @@
                               a fatal error, call sys.exit(1) otherwise
     """
     global message_interface
-    from grass.pygrass import messages
-    message_interface = messages.get_msgr(raise_on_error=raise_on_error)
+    if message_interface is None:
+        message_interface = messages.get_msgr(raise_on_error=raise_on_error)
 
 
 def get_tgis_message_interface():
@@ -266,7 +267,8 @@
        libraster, libraster3d and libvector functions
     """
     global c_library_interface
-    c_library_interface = CLibrariesInterface()
+    if c_library_interface is None:
+        c_library_interface = CLibrariesInterface()
 
 
 def get_tgis_c_library_interface():

Modified: grass/trunk/lib/python/temporal/testsuite/test_doctests.py
===================================================================
--- grass/trunk/lib/python/temporal/testsuite/test_doctests.py	2015-02-09 11:12:36 UTC (rev 64525)
+++ grass/trunk/lib/python/temporal/testsuite/test_doctests.py	2015-02-09 12:44:14 UTC (rev 64526)
@@ -22,12 +22,11 @@
     tests.addTests(doctest.DocTestSuite(grass.temporal.abstract_map_dataset))
     tests.addTests(doctest.DocTestSuite(grass.temporal.abstract_space_time_dataset))
     tests.addTests(doctest.DocTestSuite(grass.temporal.base))
-    tests.addTests(doctest.DocTestSuite(grass.temporal.c_libraries_interface))
     # Unexpected error here
-    #tests.addTests(doctest.DocTestSuite(grass.temporal.core))
+    ##tests.addTests(doctest.DocTestSuite(grass.temporal.core))
     tests.addTests(doctest.DocTestSuite(grass.temporal.datetime_math))
     # Unexpected error here
-    #tests.addTests(doctest.DocTestSuite(grass.temporal.list_stds))
+    ##tests.addTests(doctest.DocTestSuite(grass.temporal.list_stds))
     tests.addTests(doctest.DocTestSuite(grass.temporal.metadata))
     tests.addTests(doctest.DocTestSuite(grass.temporal.register))
     tests.addTests(doctest.DocTestSuite(grass.temporal.space_time_datasets))
@@ -44,6 +43,7 @@
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_raster_base_algebra))
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_operator))
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_vector_algebra))
+    tests.addTests(doctest.DocTestSuite(grass.temporal.c_libraries_interface))
     return tests
 
 if __name__ == '__main__':



More information about the grass-commit mailing list