[GRASS-SVN] r38194 - grass-addons/vector/v.in.postgis
svn_grass at osgeo.org
svn_grass at osgeo.org
Fri Jul 3 12:46:59 EDT 2009
Author: mathieug
Date: 2009-07-03 12:46:59 -0400 (Fri, 03 Jul 2009)
New Revision: 38194
Added:
grass-addons/vector/v.in.postgis/v.in.postgis.py
Removed:
grass-addons/vector/v.in.postgis/v_in_postgis.py
grass-addons/vector/v.in.postgis/v_in_postgis_tests.py
Log:
merged importer class and unit test class in same file
Added: grass-addons/vector/v.in.postgis/v.in.postgis.py
===================================================================
--- grass-addons/vector/v.in.postgis/v.in.postgis.py (rev 0)
+++ grass-addons/vector/v.in.postgis/v.in.postgis.py 2009-07-03 16:46:59 UTC (rev 38194)
@@ -0,0 +1,691 @@
+#!/usr/bin/python
+#-*- coding: utf-8 -*-
+#
+############################################################################
+#
+# MODULE: v.in.postgis
+# AUTHOR(S): Mathieu Grelier, 2009 (greliermathieu at gmail.com)
+# PURPOSE: GRASS layer creation from arbitrary PostGIS sql queries
+# COPYRIGHT: (C) 2009 Mathieu Grelier
+#
+# This program is free software under the GNU General Public
+# License (>=v2). Read the file COPYING that comes with GRASS
+# for details.
+#
+#############################################################################
+
+#%Module
+#% description: Create a grass layer from any sql query in postgis
+#% keywords: postgis, db, sql
+#%End
+#%option
+#% key: query
+#% type: string
+#% description: Any sql query returning a recordset with geometry for each row
+#% required : no
+#%end
+#%option
+#% key: geometryfield
+#% type: string
+#% answer: the_geom
+#% description: Name of the source geometry field (usually defaults to the_geom)
+#% required : no
+#%end
+#%option
+#% key: output
+#% type: string
+#% answer: v_in_postgis
+#% description: Name of the imported grass layer (do not use capital letters)
+#% required : no
+#%end
+#%flag
+#% key: t
+#% description: Run tests and exit
+#%end
+#%flag
+#% key: d
+#% description: Import result in grass dbf format (no new table in postgis - if not set, new grass layer attributes will be connected to the result table)
+#%end
+#%flag
+#% key: o
+#% description: Use -o for v.in.ogr (override dataset projection)
+#%end
+#%flag
+#% key: g
+#% description: Add a gist index to the imported result table in postgis (useless with the d flag)
+#%end
+#%flag
+#% key: l
+#% description: Log process info to v_in_postgis.log
+#%end
+
+import sys
+import os
+import re
+from subprocess import Popen, PIPE
+import traceback
+
+##see http://initd.org/pub/software/psycopg/
+import psycopg2 as dbapi2
+##see http://trac.osgeo.org/grass/browser/grass/trunk/lib/python
+from grass import core as grass
+##only needed to use Komodo debugger with Komodo IDE. See http://aspn.activestate.com/ASPN/Downloads/Komodo/RemoteDebugging
+#from dbgp.client import brk
+##see http://pyunit.sourceforge.net/
+import unittest
+
+class GrassPostGisImporter():
+
+ def __init__(self, options, flags):
+ ##options
+ self.query = options['query']
+ self.geometryfield = options['geometryfield'] if options['geometryfield'].strip() != '' else 'the_geom'
+ self.output = options['output'] if options['output'].strip() != '' else 'postgis_import'
+ ##flags
+ self.dbfFlag = True if flags['d'] is True else False
+ self.overrideprojFlag = True if flags['o'] is True else False
+ self.gistindexFlag = True if flags['g'] is True else False
+ self.logOutput = True if flags['l'] is True else False
+ ##others
+ logfilename = 'v.in.postgis.log'
+ self.logfile = os.path.join(os.getenv('LOGDIR'),logfilename) if os.getenv('LOGDIR') else logfilename
+ grass.try_remove(self.logfile)
+ ##default for grass6 ; you may need to fix this path
+ self.grassloginfile = os.path.join(os.getenv('HOME'),'.grasslogin6')
+ ##retrieve connection parameters
+ self.dbparams = self.__getDbInfos()
+ ##set db connection
+ self.db = dbapi2.connect(host=self.dbparams['host'], database=self.dbparams['db'], \
+user=self.dbparams['user'], password=self.dbparams['pwd'])
+ self.cursor = self.db.cursor()
+ ## uncomment if not default
+ #dbapi2.paramstyle = 'pyformat'
+
+ def __writeLog(self, log=''):
+ """Write the 'log' string to log file."""
+ if self.logfile is not None:
+ fileHandle = open(self.logfile, 'a')
+ log = log + '\n'
+ fileHandle.write(log)
+ fileHandle.close()
+
+ def __getDbInfos(self):
+ """Create a dictionnary with all db params needed by v.in.ogr."""
+ try:
+ dbString = grass.parse_key_val(grass.read_command('db.connect', flags = 'p'), sep = ':')['database']
+ p = re.compile(',')
+ dbElems = p.split(dbString)
+ host = grass.parse_key_val(dbElems[0], sep = '=')['host']
+ db = grass.parse_key_val(dbElems[1], sep = '=')['dbname']
+ loginLine = self.executeCommand('sed -n "/pg ' + dbElems[0] + ','\
+ + dbElems[1] + ' /p" ' + self.grassloginfile, redirect = False, shell = True)
+ p = re.compile(' ')
+ loginElems = p.split(loginLine)
+ user = loginElems[-2].strip()
+ password = loginElems[-1].strip()
+ dbParamsDict = {'host':host, 'db':db, 'user':user, 'pwd':password}
+ return dbParamsDict
+ except:
+ raise GrassPostGisImporterError("Error while trying to retrieve database information.")
+
+ def executeCommand(self, *args, **kwargs):
+ """Command execution method using Popen in two modes : shell mode or not."""
+ p = None
+ shell = True if 'shell' in kwargs and kwargs['shell'] is True else False
+ redirection = True if 'redirect' in kwargs and kwargs['redirect'] is True else False
+ command = args[0]
+ if shell is True:
+ if redirection is True and self.logOutput is not False:
+ command = command + ' >> ' + self.logfile
+ ##use redirection on stdout only
+ command = command + " 2>&1"
+ p = Popen(command, shell = shell, stdout = PIPE)
+ else:
+ kwargs['stdout'] = PIPE
+ kwargs['stderr'] = PIPE
+ p = grass.start_command(*args, **kwargs)
+ retcode = p.wait()
+ com = p.communicate()
+ message = ''
+ r = re.compile('\n')
+ for std in com:
+ if std is not None:
+ lines = r.split(std)
+ for elem in lines:
+ message += str(elem).strip() + "\n"
+ if self.logOutput is not False and shell is False:
+ self.__writeLog(message)
+ if retcode == 0:
+ return message
+ else:
+ raise GrassPostGisImporterError(message)
+
+ def printMessage(self, message, type = 'info'):
+ """Call grass message function corresponding to type."""
+ if type == 'error':
+ grass.error(message)
+ elif type == 'warning':
+ grass.warning(message)
+ elif type == 'info' and grass.gisenv()['GRASS_VERBOSE'] > 0:
+ grass.info(message)
+ if self.logOutput is True:
+ self.__writeLog(message)
+
+ def checkLayers(self, output):
+ """Test if the grass layer 'output' already exists.
+ Note : for this to work with grass6.3, in find_file function from core.py,
+ command should be (n flag removed because 6.4 specific):
+ s = read_command("g.findfile", element = element, file = name, mapset = mapset)
+ """
+ self.printMessage("Check layers:")
+ ##Test if output vector map already exists.
+ testOutput = grass.find_file(output, element = 'vector')
+ if testOutput['fullname'] != '':
+ if not grass.overwrite() is True:
+ raise GrassPostGisImporterError("Vector map " + output + " already exists in mapset search path. \
+#Use the --o flag to overwrite.")
+ else:
+ self.printMessage("Vector map " + output + " will be overwritten.", type = 'warning')
+ return True
+
+ def checkComment(self, output):
+ """Test if a table with the 'output' existing in PostGis have been created by the importer."""
+ testIfTableNameAlreadyExistsQuery = "SELECT CAST(tablename AS text) FROM pg_tables \
+ WHERE schemaname='public' \
+ AND CAST(tablename AS text)='" + output + "'"
+ self.cursor.execute(testIfTableNameAlreadyExistsQuery)
+ rows = self.cursor.fetchone()
+ if rows is not None and len(rows) > 0:
+ testCheckCommentQuery = "SELECT obj_description((SELECT c.oid FROM pg_catalog.pg_class c \
+ WHERE c.relname='" + output + "'), 'pg_class') AS comment"
+ self.cursor.execute(testCheckCommentQuery)
+ comment = self.cursor.fetchone()
+ if comment is not None and len(comment) == 1:
+ comment = comment[0]
+ if comment == "created with v.in.postgis.py":
+ self.cursor.execute("DROP TABLE " + output)
+ else:
+ raise GrassPostGisImporterError("ERROR: a table with the name " + output + " already exists \
+ and was not created by this script.")
+ return True
+
+ def createPostgresTableFromQuery(self, output, query):
+ """Create a table in postgresql populated with results from the query, and tag it.
+ We will later be able to figure out if this table was created by the importer (see checkLayers())
+ """
+ try:
+ createTableQuery = "CREATE TABLE " + str(output) + " AS " + str(query)
+ if self.logOutput is True:
+ self.__writeLog("Try to import data:")
+ self.__writeLog(createTableQuery)
+ self.cursor.execute(createTableQuery)
+ addCommentQuery = "COMMENT ON TABLE " + output + " IS 'created with v.in.postgis.py'"
+ self.cursor.execute(addCommentQuery)
+ except:
+ raise GrassPostGisImporterError("An error occurred during sql import. Check your connection \
+ to the database and your sql query.")
+
+ def addCategory(self, output):
+ """Add a category column in the result table.
+ With the pg driver (not the dbf one), v.in.ogr need a 'cat' column for index creation
+ if -d flag wasn't not selected, can't import if query result already have a cat column
+ todo : add cat_ column in this case, as v.in.ogr with dbf driver do
+ """
+ try:
+ self.printMessage("Adding category column.")
+ s = "ALTER TABLE " + str(output) + " ADD COLUMN cat serial NOT NULL"
+ self.cursor.execute("ALTER TABLE " + str(output) + " ADD COLUMN cat serial NOT NULL")
+ tmp_pkey_name = "tmp_pkey_" + str(output)
+ self.cursor.execute("ALTER TABLE " + str(output) + " ADD CONSTRAINT " \
+ + tmp_pkey_name + " PRIMARY KEY (cat)")
+ except:
+ raise GrassPostGisImporterError("Unable to add a 'cat' column. A column named 'CAT' \
+ or 'cat' may be present in your input data. \
+ This column is reserved for Grass to store categories.")
+
+ def getGeometryInfo(self, output, geometryfield):
+ """Retrieve geometry parameters of the result.
+ We need to use the postgis AddGeometryColumn function so that v.in.ogr will work.
+ This method aims to retrieve necessary info for AddGeometryColumn to work.
+ Returns a dict with
+ """
+ self.printMessage("Retrieving geometry info.")
+ ##if there is more than one geometry type in the query result table, we use the
+ ##generic GEOMETRY type
+ type="GEOMETRY"
+ self.cursor.execute("SELECT DISTINCT GeometryType(" + geometryfield + ") FROM " + output)
+ rows = self.cursor.fetchall()
+ if rows is not None and len(rows) == 1:
+ type = str(rows[0][0])
+ if rows is None or len(rows) == 0:
+ raise GrassPostGisImporterError("Unable to retrieve geometry type")
+ ##same thing with number of dimensions. If the query is syntactically correct but returns
+ ##no geometry, this step will cause an error.
+ ndims = 0
+ self.cursor.execute("SELECT DISTINCT ndims(" + geometryfield + ") FROM " + output)
+ rows = self.cursor.fetchall()
+ if rows is not None and len(rows) == 1:
+ ndims = str(rows[0][0])
+ if self.logOutput is True:
+ self.__writeLog("ndims=" + ndims)
+ else:
+ raise GrassPostGisImporterError("unable to retrieve a unique coordinates dimension for \
+ this query or no geometry is present. Check your sql query.")
+ ##srid
+ srid="-1"
+ self.cursor.execute("SELECT DISTINCT srid(" + geometryfield + ") FROM " + output)
+ rows = self.cursor.fetchall()
+ if rows is not None and len(rows[0]) == 1:
+ srid = str(rows[0][0])
+ if self.logOutput is True:
+ self.__writeLog("srid=" + srid)
+ elif rows is not None and len(rows[0]) > 1:
+ if self.logOutput is True:
+ self.__writeLog("Unable to retrieve a unique geometry srid for this query. \
+ Using undefined srid.")
+ else:
+ raise GrassPostGisImporterError("Unable to retrieve geometry parameters.")
+ geoParamsDict = {'type':type, 'ndims':ndims, 'srid':srid}
+ return geoParamsDict
+
+ def addGeometry(self, output, geometryField, geoParams, addGistIndex=False):
+ """Create geometry for result."""
+ try:
+ ##first we must remove other geometry columns than selected one that may be present in the query result,
+ ##because v.in.ogr does not allow geometry columns selection
+ ##v.in.ogr takes the first geometry column found in the table so if another geometry is present,
+ ##as we use AddGeometryColumn fonction to copy selected geometry (see below), our geometry will
+ ##appear after other geometries in the column list. In this case, v.in.ogr would not import the
+ ##right geometry.
+ self.printMessage("Checking for other geometries.")
+ self.cursor.execute("SELECT column_name FROM(SELECT ordinal_position, column_name, udt_name \
+ FROM INFORMATION_SCHEMA.COLUMNS \
+ WHERE (TABLE_NAME='" + output + "') ORDER BY ordinal_position) AS info \
+ WHERE udt_name='geometry' AND NOT column_name='" + geometryField + "'")
+ rows = self.cursor.fetchall()
+ if rows is not None and len(rows) >= 1:
+ for otherGeoColumn in rows:
+ if self.logOutput is True:
+ self.__writeLog("Found another geometry in the query result than selected one: "\
+ + str(otherGeoColumn) + ". Column will be dropped.")
+ self.cursor.execute("ALTER TABLE " + output + " DROP COLUMN " + otherGeoColumn)
+ ##we already inserted the geometry so we will recopy it in the newly created geometry column
+ if self.logOutput is True:
+ self.__writeLog("Create geometry column.")
+ self.cursor.execute("ALTER TABLE " + output + " RENAME COLUMN " + geometryField + " TO the_geom_tmp")
+ self.cursor.execute("SELECT AddGeometryColumn('', '" + output + "','" + geometryField + "',\
+ "+ geoParams['srid'] + ",'" + geoParams['type'] + "'," + geoParams['ndims'] + ");")
+ self.cursor.execute("UPDATE " + output + " SET " + geometryField + " = the_geom_tmp")
+ self.cursor.execute("ALTER TABLE " + output + " DROP COLUMN the_geom_tmp")
+ if addGistIndex is True:
+ self.cursor.execute("CREATE INDEX " + output + "_index ON " + output + " \
+ USING GIST (" + geometryField + " GIST_GEOMETRY_OPS);")
+ except:
+ raise GrassPostGisImporterError("An error occured during geometry insertion.")
+
+ def importToGrass(self, output, geometryField, geoParams, toDbf = False, overrideProj = False):
+ """Wrapper for import with v.in.ogr and db connection of the result.
+ Note : for grass.gisenv() to work with grass6.3, in gisenv function from core.py,
+ command should be (n flag removed because 6.4 specific):
+ s = read_command("g.findfile", element = element, file = name, mapset = mapset)
+ """
+ ##dbf driver
+ flags = ''
+ outputname = output
+ if toDbf is True:
+ env = grass.gisenv()
+ dbfFolderPath = os.path.join(env['GISDBASE'].strip(";'"), env['LOCATION_NAME'].strip(";'"), \
+ env['MAPSET'].strip(";'"), 'dbf')
+ if not os.path.exists(dbfFolderPath):
+ os.mkdir(dbfFolderPath)
+ grass.run_command("db.connect", driver = 'dbf', database = dbfFolderPath)
+ else:
+ flags += 't'
+ if overrideProj is True:
+ flags += 'o'
+
+ ##finally call v.in.ogr
+ self.printMessage("call v.in.ogr...")
+ dsn="PG:host=" + self.dbparams['host'] + " dbname=" + self.dbparams['db'] \
+ + " user=" + self.dbparams['user'] + " password=" + self.dbparams['pwd']
+ layername = output
+ cmd = self.executeCommand("v.in.ogr", dsn = dsn, output = outputname, layer = layername, \
+ flags = flags, overwrite=True, quiet = False)
+ if toDbf is True:
+ grass.run_command("db.connect", driver = 'pg', database = 'host=' + self.dbparams['host'] + \
+ ",dbname=" + self.dbparams['db'])
+ self.cursor.execute('DROP TABLE ' + output)
+ ##else we work directly with postgis so the connection between imported grass layer
+ ##and postgres attribute table must be explicit
+ else:
+ ##can cause segfaults if mapset name is too long:
+ cmd = self.executeCommand("v.db.connect", map = output, table = output, flags = 'o')
+
+ ##delete temporary data in geometry_columns table
+ self.cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + output + "'")
+ pass
+
+ def commitChanges(self):
+ """Commit current transaction."""
+ self.db.commit()
+
+ def makeSqlImport(self):
+ """GrassPostGisImporter main sequence."""
+ ##1)check layers before starting
+ self.checkLayers(self.output)
+ ##2)query
+ self.createPostgresTableFromQuery(self.output, self.query)
+ ##3)cats
+ if self.dbfFlag is False:
+ self.addCategory(self.output)
+ ##4)geometry parameters
+ geoparams = self.getGeometryInfo(self.output, self.geometryfield)
+ ##5)new geometry
+ self.addGeometry(self.output, self.geometryfield, geoparams, self.gistindexFlag)
+ self.commitChanges()
+ ##6)v.in.ogr
+ self.importToGrass(self.output, self.geometryfield, geoparams, toDbf = self.dbfFlag, \
+ overrideProj = self.overrideprojFlag)
+ ##7)post-import operations
+ self.commitChanges()
+
+class GrassPostGisImporterError(Exception):
+ """Errors specific to GrassPostGisImporter class."""
+ def __init__(self, message=''):
+ self.details = '\nDetails:\n'
+ exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
+ self.details += repr(traceback.format_exception(exceptionType, exceptionValue, exceptionTraceback))
+ self.message = message + "\n" + self.details
+
+def main():
+ exitStatus = 0
+ try:
+ postgisImporter = GrassPostGisImporter(options, flags)
+ postgisImporter.makeSqlImport()
+ except GrassPostGisImporterError, e1:
+ postgisImporter.printMessage(e1.message, type = 'error')
+ exitStatus = 1
+ except:
+ exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
+ errorMessage = "Unexpected error \n:" + \
+ repr(traceback.format_exception(exceptionType, exceptionValue, exceptionTraceback))
+ postgisImporter.printMessage(errorMessage, type = 'error')
+ exitStatus = 1
+ else:
+ postgisImporter.printMessage("Done", type = 'info')
+ finally:
+ sys.exit(exitStatus)
+
+#############################################################################
+# A unit test for v.in.postgis
+#############################################################################
+
+options = {'query':'', 'geometryfield':'', 'output':''}
+flags = {'d':0, 'z':0, 'o':0, 'g':0, 'l':0}
+importer = GrassPostGisImporter(options, flags)
+##test configuration
+host = 'localhost'
+dbname = 'serveur_donnees_meteo'
+user = 'postgres'
+pwd = 'rootnculture'
+db = dbapi2.connect(host=host, database=dbname, user=user, password=pwd)
+cursor = db.cursor()
+testTableName = 'test_grass_import'
+queryTableName = 'test_query'
+geometryField = 'the_geom'
+srid = '2154'
+geoparams = {'type':'MULTIPOLYGON', 'ndims':'2', 'srid': srid}
+query = 'select * from ' + testTableName + ' where id>1'
+
+class GrassPostGisImporterTests(unittest.TestCase):
+
+ def setUp(self):
+ grass.run_command("db.connect", driver = 'pg', database = 'host=' + host + ",dbname=" + dbname)
+
+ def tearDown(self):
+ cleanUpQueryTable()
+
+ def testGetDbInfos(self):
+ """Test if the importer is able to retrieve correctly the parameters dictionnary for current connection."""
+ self.assertEqual(importer.dbparams['host'],host)
+ self.assertEqual(importer.dbparams['db'],dbname)
+ self.assertEqual(importer.dbparams['user'],user)
+ self.assertEqual(importer.dbparams['pwd'],pwd)
+
+ def testCheckLayers(self):
+ """Test if overwrite is working correctly."""
+ os.environ['GRASS_OVERWRITE'] = '0'
+ createQueryTable()
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.commitChanges()
+ importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = False, overrideProj = True)
+ importer.commitChanges()
+ ##GRASS_OVERWRITE set to False, we can't import again the layer
+ self.assertRaises(GrassPostGisImporterError, importer.checkLayers, queryTableName)
+ cleanUpQueryTable()
+ ##now set to True, it should be possible
+ os.environ['GRASS_OVERWRITE'] = '1'
+ createQueryTable()
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.commitChanges()
+ importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = False, overrideProj = True)
+ importer.commitChanges()
+ try:
+ importer.checkLayers(queryTableName)
+ except GrassPostGisImporterError:
+ self.fail("CheckLayers was expected to be successful with --o flag.")
+ pass
+
+ def testCheckComment(self):
+ """Test that we can't drop a table with the output name if it was not created by the importer."""
+ ##a table that was not tagged by the importer should not be overwritten
+ os.environ['GRASS_OVERWRITE'] = '1'
+ self.assertRaises(GrassPostGisImporterError, importer.checkComment, testTableName)
+ pass
+
+ def testImportGrassLayer(self):
+ """Test import sequence result in GRASS."""
+ ##preparation
+ importer.query = query
+ importer.output = queryTableName
+ importer.geometryfield = geometryField
+ importer.gistindexFlag = True
+ importer.overrideprojFlag = True
+ importer.dbfFlag = False
+ ##PostGIS import
+ importer.makeSqlImport()
+ testOutput = grass.find_file(queryTableName, element = 'vector')
+ self.assertTrue(testOutput['fullname'] != '')
+ pgdbconnectstring = "host=" + host + ",dbname=" + dbname
+ cmd = importer.executeCommand("v.db.connect", flags = 'g', map = queryTableName, driver = 'pg')
+ r = re.compile(' ')
+ dbinfos = r.split(cmd)
+ def trim(p): return str(p).strip()
+ dbinfos = map(trim, dbinfos)
+ self.assertEquals(dbinfos[0], '1')
+ self.assertEquals(dbinfos[1], queryTableName)
+ self.assertEquals(dbinfos[2], 'cat')
+ self.assertEquals(dbinfos[3], pgdbconnectstring)
+ self.assertEquals(dbinfos[4], 'pg')
+ ##cast is necessary
+ cmd = importer.executeCommand("echo 'SELECT CAST(COUNT (*) AS int) FROM " + queryTableName + "' | db.select -c", shell = True)
+ self.assertEquals(int(cmd[0]), 2)
+ cleanUpQueryTable()
+ #Dbf import
+ importer.dbfFlag = True
+ importer.makeSqlImport()
+ testOutput = grass.find_file(queryTableName, element = 'vector')
+ self.assertTrue(testOutput['fullname'] != '')
+ env = grass.gisenv()
+ dbfDbPath = os.path.join(env['GISDBASE'].strip(";'"), env['LOCATION_NAME'].strip(";'"), \
+ env['MAPSET'].strip(";'"), 'dbf')
+ cmd = importer.executeCommand("v.db.connect", flags = 'g', map = queryTableName, driver = 'dbf')
+ r = re.compile(' ')
+ dbinfos = r.split(cmd)
+ dbinfos = map(trim, dbinfos)
+ self.assertEquals(dbinfos[0], '1')
+ self.assertEquals(dbinfos[1], queryTableName)
+ self.assertEquals(dbinfos[2], 'cat')
+ self.assertEquals(dbinfos[3], dbfDbPath)
+ self.assertEquals(dbinfos[4], 'dbf')
+ cmd = importer.executeCommand("db.select", flags = 'c', table = queryTableName, \
+ database = dbfDbPath, driver = 'dbf')
+ r = re.compile('\n')
+ lines = r.split(cmd)
+ def validrecord(l): return len(str(l).strip()) > 0
+ result = filter(validrecord, lines)
+ self.assertEquals(len(result), 2)
+
+ def testGetGeometryInfos(self):
+ """Test that we correctly retrieve geometry parameters from PostGis result table."""
+ createQueryTable()
+ params = importer.getGeometryInfo(queryTableName, geometryField)
+ self.assertEquals(params['type'], geoparams['type'])
+ self.assertEquals(params['ndims'], geoparams['ndims'])
+ self.assertEquals(params['srid'], geoparams['srid'])
+ ##needed
+ importer.commitChanges()
+
+ def testAddingCategoryWithPgDriverIsNecessary(self):
+ """Test is the cat column addition is working and is still necessary with pg driver import.
+
+ cat column is necessary for GRASS to store categories.
+ For now, the pg driver for v.in.ogr doesn't doesn't add automatically this
+ cat column, whereas the dbf driver does. So the importer has a specific addCategory()
+ method, which necessity is tested here.
+ """
+ ##starting with postgis to dbf import : no need to use the addCategory() method is expected
+ createQueryTable()
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.commitChanges()
+ importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = True, overrideProj = True)
+ importer.commitChanges()
+ try:
+ cmd = importer.executeCommand("v.univar", map = queryTableName, column = 'value')
+ except GrassPostGisImporterError:
+ self.fail("Categories should be retrieved with dbf driver.")
+ cleanUpQueryTable()
+ ##now same operations with pg driver : error is expected when GRASS use the cat column.
+ createQueryTable()
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.commitChanges()
+ importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
+ importer.commitChanges()
+ self.assertRaises(GrassPostGisImporterError, \
+ importer.executeCommand, "v.univar", map = queryTableName, column = 'value')
+ cleanUpQueryTable()
+ ##now same operations with pg driver, after adding category column
+ ##with the importer : error should not occur.
+ createQueryTable()
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.addCategory(queryTableName)
+ importer.commitChanges()
+ importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
+ importer.commitChanges()
+ try:
+ cmd = importer.executeCommand("v.univar", map = queryTableName, column = 'value')
+ except GrassPostGisImporterError:
+ self.fail("Categories should be retrieved with pg driver when categories are added.")
+
+ def testGeometryDuplicationIsNecessary(self):
+ """Test that we need to use the postGis' AddGeometryColumn function."""
+ createQueryTable()
+ importer.addCategory(queryTableName)
+ importer.commitChanges()
+ self.assertRaises(GrassPostGisImporterError, importer.importToGrass, queryTableName, \
+ geometryField, geoparams, False, True)
+ cleanUpQueryTable()
+ createQueryTable()
+ ##now addGeometry:
+ importer.addGeometry(queryTableName, geometryField, geoparams, False)
+ importer.addCategory(queryTableName)
+ importer.commitChanges()
+ try:
+ importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
+ except GrassPostGisImporterError:
+ self.fail("Both operations are for now expected to be necessary.")
+ pass
+
+def createQueryTable():
+ importer.createPostgresTableFromQuery(queryTableName, query)
+ importer.commitChanges()
+
+def cleanUpQueryTable():
+ db.rollback()
+ try:
+ importer.executeCommand("g.remove", vect = queryTableName, quiet = True)
+ except:
+ pass
+ try:
+ cursor.execute('DROP TABLE ' + queryTableName)
+ except:
+ pass
+ try:
+ cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + queryTableName + "'")
+ except:
+ pass
+ db.commit()
+
+def cleanUp():
+ db.rollback()
+ cleanUpQueryTable()
+ try:
+ importer.executeCommand("g.remove", vect = testTableName, quiet = True)
+ except:
+ pass
+ try:
+ cursor.execute('DROP TABLE ' + testTableName)
+ except:
+ pass
+ try:
+ cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + testTableName + "'")
+ except:
+ pass
+ db.commit()
+
+def tests():
+ currentDirectory = os.path.split(__file__)[0]
+ sys.path.append(currentDirectory)
+ ##test geo table
+ cursor.execute("CREATE TABLE " + testTableName + " ( id int4, label varchar(20), value real )")
+ cursor.execute("SELECT AddGeometryColumn('', '" + testTableName + "','" + geometryField + "'," + srid + ",'MULTIPOLYGON',2)")
+ cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (1, 'A Geometry', 10, \
+ GeomFromText('MULTIPOLYGON(((771825.9029201793 6880170.713342139,771861.2893824165 \
+ 6880137.00894025,771853.633171779 6880129.128272728,771818.4675156211 \
+ 6880162.7242448665,771825.9029201793 6880170.713342139)))', " + srid + "))")
+ cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (2, 'Another Geometry', 20, \
+ GeomFromText('MULTIPOLYGON(((771853.633171779 6880129.128272728,771842.2964842085 \
+ 6880117.197908178,771807.1308239839 6880150.79394592,771818.4675156211 \
+ 6880162.7242448665,771853.633171779 6880129.128272728)))', " + srid + "))")
+ cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (3, 'A last Geometry', 20, \
+ GeomFromText('MULTIPOLYGON(((771807.1308239839 6880150.79394592,771842.2964842085 \
+ 6880117.197908178,771831.1791767566 6880105.270296125,771795.7209431691 \
+ 6880138.862761175,771807.1308239839 6880150.79394592)))', " + srid + "))")
+ cursor.execute("ALTER TABLE " + testTableName + " ADD CONSTRAINT test_pkey " + " PRIMARY KEY (id)")
+ db.commit()
+ os.environ['GRASS_VERBOSE'] = '0'
+ try:
+ runner = unittest.TextTestRunner()
+ suite = unittest.TestSuite()
+ suite.addTest(GrassPostGisImporterTests("testGetDbInfos"))
+ suite.addTest(GrassPostGisImporterTests("testCheckLayers"))
+ suite.addTest(GrassPostGisImporterTests("testCheckComment"))
+ suite.addTest(GrassPostGisImporterTests("testImportGrassLayer"))
+ suite.addTest(GrassPostGisImporterTests("testGetGeometryInfos"))
+ suite.addTest(GrassPostGisImporterTests("testAddingCategoryWithPgDriverIsNecessary"))
+ suite.addTest(GrassPostGisImporterTests("testGeometryDuplicationIsNecessary"))
+ runner.run(suite)
+ finally:
+ cleanUp()
+ sys.exit(0)
+
+
+if __name__ == "__main__":
+ ### DEBUG : uncomment to start local debugging session
+ #brk(host="localhost", port=9000)
+ options, flags = grass.parser()
+ if flags['t'] is True:
+ tests()
+ else:
+ main()
+
Property changes on: grass-addons/vector/v.in.postgis/v.in.postgis.py
___________________________________________________________________
Added: svn:executable
+ *
Deleted: grass-addons/vector/v.in.postgis/v_in_postgis.py
===================================================================
--- grass-addons/vector/v.in.postgis/v_in_postgis.py 2009-07-03 16:44:22 UTC (rev 38193)
+++ grass-addons/vector/v.in.postgis/v_in_postgis.py 2009-07-03 16:46:59 UTC (rev 38194)
@@ -1,426 +0,0 @@
-#!/usr/bin/python
-#-*- coding: utf-8 -*-
-#
-############################################################################
-#
-# MODULE: v_in_postgis
-# AUTHOR(S): Mathieu Grelier, 2009 (greliermathieu at gmail.com)
-# PURPOSE: GRASS layer creation from arbitrary PostGIS sql queries
-# COPYRIGHT: (C) 2009 Mathieu Grelier
-#
-# This program is free software under the GNU General Public
-# License (>=v2). Read the file COPYING that comes with GRASS
-# for details.
-#
-#############################################################################
-
-#%Module
-#% description: Create a grass layer from any sql query in postgis
-#% keywords: postgis, db, sql
-#%End
-#%option
-#% key: query
-#% type: string
-#% description: Any sql query returning a recordset with geometry for each row
-#% required : yes
-#%end
-#%option
-#% key: geometryfield
-#% type: string
-#% answer: the_geom
-#% description: Name of the source geometry field (usually defaults to the_geom)
-#% required : yes
-#%end
-#%option
-#% key: output
-#% type: string
-#% answer: v_in_postgis
-#% description: Name of the imported grass layer (do not use capital letters)
-#% required : no
-#%end
-#%flag
-#% key: d
-#% description: Import result in grass dbf format (no new table in postgis - if not set, new grass layer attributes will be connected to the result table)
-#%end
-#%flag
-#% key: o
-#% description: Use -o for v.in.ogr (override dataset projection)
-#%end
-#%flag
-#% key: g
-#% description: Add a gist index to the imported result table in postgis (useless with the d flag)
-#%end
-#%flag
-#% key: l
-#% description: Log process info to v_in_postgis.log
-#%end
-
-import sys
-import os
-import re
-from subprocess import Popen, PIPE
-import traceback
-
-##see http://initd.org/pub/software/psycopg/
-import psycopg2 as dbapi2
-##see http://trac.osgeo.org/grass/browser/grass/trunk/lib/python
-from grass import core as grass
-##only needed to use Komodo debugger with Komodo IDE. See http://aspn.activestate.com/ASPN/Downloads/Komodo/RemoteDebugging
-#from dbgp.client import brk
-
-class GrassPostGisImporter():
-
- def __init__(self, options, flags):
- ##options
- self.query = options['query']
- self.geometryfield = options['geometryfield'] if options['geometryfield'].strip() != '' else 'the_geom'
- self.output = options['output'] if options['output'].strip() != '' else 'postgis_import'
- ##flags
- self.dbfFlag = True if flags['d'] is True else False
- self.overrideprojFlag = True if flags['o'] is True else False
- self.gistindexFlag = True if flags['g'] is True else False
- self.logOutput = True if flags['l'] is True else False
- ##others
- logfilename = 'v_in_postgis.log'
- self.logfile = os.path.join(os.getenv('LOGDIR'),logfilename) if os.getenv('LOGDIR') else logfilename
- grass.try_remove(self.logfile)
- ##default for grass6 ; you may need to fix this path
- self.grassloginfile = os.path.join(os.getenv('HOME'),'.grasslogin6')
- ##retrieve connection parameters
- self.dbparams = self.__getDbInfos()
- ##set db connection
- self.db = dbapi2.connect(host=self.dbparams['host'], database=self.dbparams['db'], \
-user=self.dbparams['user'], password=self.dbparams['pwd'])
- self.cursor = self.db.cursor()
- ## uncomment if not default
- #dbapi2.paramstyle = 'pyformat'
-
- def __writeLog(self, log=''):
- """Write the 'log' string to log file"""
- if self.logfile is not None:
- fileHandle = open(self.logfile, 'a')
- log = log + '\n'
- fileHandle.write(log)
- fileHandle.close()
-
- def __getDbInfos(self):
- """Create a dictionnary with all db params needed by v.in.ogr"""
- try:
- dbString = grass.parse_key_val(grass.read_command('db.connect', flags = 'p'), sep = ':')['database']
- p = re.compile(',')
- dbElems = p.split(dbString)
- host = grass.parse_key_val(dbElems[0], sep = '=')['host']
- db = grass.parse_key_val(dbElems[1], sep = '=')['dbname']
- loginLine = self.executeCommand('sed -n "/pg ' + dbElems[0] + ','\
- + dbElems[1] + ' /p" ' + self.grassloginfile, redirect = False, shell = True)
- p = re.compile(' ')
- loginElems = p.split(loginLine)
- user = loginElems[-2].strip()
- password = loginElems[-1].strip()
- dbParamsDict = {'host':host, 'db':db, 'user':user, 'pwd':password}
- return dbParamsDict
- except:
- raise GrassPostGisImporterError("Error while trying to retrieve database information.")
-
- def executeCommand(self, *args, **kwargs):
- """Command execution method using Popen in two modes : shell mode or not"""
- p = None
- shell = True if 'shell' in kwargs and kwargs['shell'] is True else False
- redirection = True if 'redirect' in kwargs and kwargs['redirect'] is True else False
- command = args[0]
- if shell is True:
- if redirection is True and self.logOutput is not False:
- command = command + ' >> ' + self.logfile
- ##use redirection on stdout only
- command = command + " 2>&1"
- p = Popen(command, shell = shell, stdout = PIPE)
- else:
- kwargs['stdout'] = PIPE
- kwargs['stderr'] = PIPE
- p = grass.start_command(*args, **kwargs)
- retcode = p.wait()
- com = p.communicate()
- message = ''
- r = re.compile('\n')
- for std in com:
- if std is not None:
- lines = r.split(std)
- for elem in lines:
- message += str(elem).strip() + "\n"
- if self.logOutput is not False and shell is False:
- self.__writeLog(message)
- if retcode == 0:
- return message
- else:
- raise GrassPostGisImporterError(message)
-
- def printMessage(self, message, type = 'info'):
- """Call grass message function corresponding to type"""
- if type == 'error':
- grass.error(message)
- elif type == 'warning':
- grass.warning(message)
- elif type == 'info' and grass.gisenv()['GRASS_VERBOSE'] > 0:
- grass.info(message)
- if self.logOutput is True:
- self.__writeLog(message)
-
- def checkLayers(self, output):
- """
- Test if the grass layer 'output' already exists.
-
- Note : for this to work with grass6.3, in find_file function from core.py,
- command should be (n flag removed because 6.4 specific):
- s = read_command("g.findfile", element = element, file = name, mapset = mapset)
- """
- self.printMessage("Check layers:")
- ##Test if output vector map already exists.
- testOutput = grass.find_file(output, element = 'vector')
- if testOutput['fullname'] != '':
- if not grass.overwrite() is True:
- raise GrassPostGisImporterError("Vector map " + output + " already exists in mapset search path. \
-#Use the --o flag to overwrite.")
- else:
- self.printMessage("Vector map " + output + " will be overwritten.", type = 'warning')
- return True
-
- def checkComment(self, output):
- """Test if a table with the 'output' existing in PostGis have been created by the importer"""
- testIfTableNameAlreadyExistsQuery = "SELECT CAST(tablename AS text) FROM pg_tables \
- WHERE schemaname='public' \
- AND CAST(tablename AS text)='" + output + "'"
- self.cursor.execute(testIfTableNameAlreadyExistsQuery)
- rows = self.cursor.fetchone()
- if rows is not None and len(rows) > 0:
- testCheckCommentQuery = "SELECT obj_description((SELECT c.oid FROM pg_catalog.pg_class c \
- WHERE c.relname='" + output + "'), 'pg_class') AS comment"
- self.cursor.execute(testCheckCommentQuery)
- comment = self.cursor.fetchone()
- if comment is not None and len(comment) == 1:
- comment = comment[0]
- if comment == "created_with_v_in_postgis.py":
- self.cursor.execute("DROP TABLE " + output)
- else:
- raise GrassPostGisImporterError("ERROR: a table with the name " + output + " already exists \
- and was not created by this script.")
- return True
-
- def createPostgresTableFromQuery(self, output, query):
- """
- Create a table in postgresql populated with results from the query, and comment it so we
- will later be able to figure out if this table was created by the importer (see checkLayers())
- """
- try:
- createTableQuery = "CREATE TABLE " + str(output) + " AS " + str(query)
- if self.logOutput is True:
- self.__writeLog("Try to import data:")
- self.__writeLog(createTableQuery)
- self.cursor.execute(createTableQuery)
- addCommentQuery = "COMMENT ON TABLE " + output + " IS 'created_with_v_in_postgis.py'"
- self.cursor.execute(addCommentQuery)
- except:
- raise GrassPostGisImporterError("An error occurred during sql import. Check your connection \
- to the database and your sql query.")
-
- def addCategory(self, output):
- """
- Add a category column in the result table
-
- With the pg driver (not the dbf one), v.in.ogr need a 'cat' column for index creation
- if -d flag wasn't not selected, can't import if query result already have a cat column
- todo : add cat_ column in this case, as v.in.ogr with dbf driver do
- """
- try:
- self.printMessage("Adding category column.")
- s = "ALTER TABLE " + str(output) + " ADD COLUMN cat serial NOT NULL"
- self.cursor.execute("ALTER TABLE " + str(output) + " ADD COLUMN cat serial NOT NULL")
- tmp_pkey_name = "tmp_pkey_" + str(output)
- self.cursor.execute("ALTER TABLE " + str(output) + " ADD CONSTRAINT " \
- + tmp_pkey_name + " PRIMARY KEY (cat)")
- except:
- raise GrassPostGisImporterError("Unable to add a 'cat' column. A column named 'CAT' \
- or 'cat' may be present in your input data. \
- This column is reserved for Grass to store categories.")
-
- def getGeometryInfo(self, output, geometryfield):
- """
- Retrieve geometry parameters of the result.
-
- We need to use the postgis AddGeometryColumn function so that v.in.ogr will work.
- This method aims to retrieve necessary info for AddGeometryColumn to work.
- Returns a dict with
- """
- self.printMessage("Retrieving geometry info.")
- ##if there is more than one geometry type in the query result table, we use the
- ##generic GEOMETRY type
- type="GEOMETRY"
- self.cursor.execute("SELECT DISTINCT GeometryType(" + geometryfield + ") FROM " + output)
- rows = self.cursor.fetchall()
- if rows is not None and len(rows) == 1:
- type = str(rows[0][0])
- if rows is None or len(rows) == 0:
- raise GrassPostGisImporterError("Unable to retrieve geometry type")
- ##same thing with number of dimensions. If the query is syntactically correct but returns
- ##no geometry, this step will cause an error.
- ndims = 0
- self.cursor.execute("SELECT DISTINCT ndims(" + geometryfield + ") FROM " + output)
- rows = self.cursor.fetchall()
- if rows is not None and len(rows) == 1:
- ndims = str(rows[0][0])
- if self.logOutput is True:
- self.__writeLog("ndims=" + ndims)
- else:
- raise GrassPostGisImporterError("unable to retrieve a unique coordinates dimension for \
- this query or no geometry is present. Check your sql query.")
- ##srid
- srid="-1"
- self.cursor.execute("SELECT DISTINCT srid(" + geometryfield + ") FROM " + output)
- rows = self.cursor.fetchall()
- if rows is not None and len(rows[0]) == 1:
- srid = str(rows[0][0])
- if self.logOutput is True:
- self.__writeLog("srid=" + srid)
- elif rows is not None and len(rows[0]) > 1:
- if self.logOutput is True:
- self.__writeLog("Unable to retrieve a unique geometry srid for this query. \
- Using undefined srid.")
- else:
- raise GrassPostGisImporterError("Unable to retrieve geometry parameters.")
- geoParamsDict = {'type':type, 'ndims':ndims, 'srid':srid}
- return geoParamsDict
-
- def addGeometry(self, output, geometryField, geoParams, addGistIndex=False):
- """Create geometry for result"""
- try:
- ##first we must remove other geometry columns than selected one that may be present in the query result,
- ##because v.in.ogr does not allow geometry columns selection
- ##v.in.ogr takes the first geometry column found in the table so if another geometry is present,
- ##as we use AddGeometryColumn fonction to copy selected geometry (see below), our geometry will
- ##appear after other geometries in the column list. In this case, v.in.ogr would not import the
- ##right geometry.
- self.printMessage("Checking for other geometries.")
- self.cursor.execute("SELECT column_name FROM(SELECT ordinal_position, column_name, udt_name \
- FROM INFORMATION_SCHEMA.COLUMNS \
- WHERE (TABLE_NAME='" + output + "') ORDER BY ordinal_position) AS info \
- WHERE udt_name='geometry' AND NOT column_name='" + geometryField + "'")
- rows = self.cursor.fetchall()
- if rows is not None and len(rows) >= 1:
- for otherGeoColumn in rows:
- if self.logOutput is True:
- self.__writeLog("Found another geometry in the query result than selected one: "\
- + str(otherGeoColumn) + ". Column will be dropped.")
- self.cursor.execute("ALTER TABLE " + output + " DROP COLUMN " + otherGeoColumn)
- ##we already inserted the geometry so we will recopy it in the newly created geometry column
- if self.logOutput is True:
- self.__writeLog("Create geometry column.")
- self.cursor.execute("ALTER TABLE " + output + " RENAME COLUMN " + geometryField + " TO the_geom_tmp")
- self.cursor.execute("SELECT AddGeometryColumn('', '" + output + "','" + geometryField + "',\
- "+ geoParams['srid'] + ",'" + geoParams['type'] + "'," + geoParams['ndims'] + ");")
- self.cursor.execute("UPDATE " + output + " SET " + geometryField + " = the_geom_tmp")
- self.cursor.execute("ALTER TABLE " + output + " DROP COLUMN the_geom_tmp")
- if addGistIndex is True:
- self.cursor.execute("CREATE INDEX " + output + "_index ON " + output + " \
- USING GIST (" + geometryField + " GIST_GEOMETRY_OPS);")
- except:
- raise GrassPostGisImporterError("An error occured during geometry insertion.")
-
- def importToGrass(self, output, geometryField, geoParams, toDbf = False, overrideProj = False):
- """Wrapper for import and db connection of the result
-
- Note : for grass.gisenv() to work with grass6.3, in gisenv function from core.py,
- command should be (n flag removed because 6.4 specific):
- s = read_command("g.findfile", element = element, file = name, mapset = mapset)
- """
- ##dbf driver
- flags = ''
- outputname = output
- if toDbf is True:
- env = grass.gisenv()
- dbfFolderPath = os.path.join(env['GISDBASE'].strip(";'"), env['LOCATION_NAME'].strip(";'"), \
- env['MAPSET'].strip(";'"), 'dbf')
- if not os.path.exists(dbfFolderPath):
- os.mkdir(dbfFolderPath)
- grass.run_command("db.connect", driver = 'dbf', database = dbfFolderPath)
- else:
- flags += 't'
- if overrideProj is True:
- flags += 'o'
-
- ##finally call v.in.ogr
- self.printMessage("call v.in.ogr...")
- dsn="PG:host=" + self.dbparams['host'] + " dbname=" + self.dbparams['db'] \
- + " user=" + self.dbparams['user'] + " password=" + self.dbparams['pwd']
- layername = output
- cmd = self.executeCommand("v.in.ogr", dsn = dsn, output = outputname, layer = layername, \
- flags = flags, overwrite=True, quiet = False)
- if toDbf is True:
- grass.run_command("db.connect", driver = 'pg', database = 'host=' + self.dbparams['host'] + \
- ",dbname=" + self.dbparams['db'])
- self.cursor.execute('DROP TABLE ' + output)
- ##else we work directly with postgis so the connection between imported grass layer
- ##and postgres attribute table must be explicit
- else:
- ##can cause segfaults if mapset name is too long:
- cmd = self.executeCommand("v.db.connect", map = output, table = output, flags = 'o')
-
- ##delete temporary data in geometry_columns table
- self.cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + output + "'")
- pass
-
- def commitChanges(self):
- """Commit current transaction"""
- self.db.commit()
-
- def makeSqlImport(self):
- """GrassPostGisImporter main sequence"""
- ##1)check layers before starting
- self.checkLayers(self.output)
- ##2)query
- self.createPostgresTableFromQuery(self.output, self.query)
- ##3)cats
- if self.dbfFlag is False:
- self.addCategory(self.output)
- ##4)geometry parameters
- geoparams = self.getGeometryInfo(self.output, self.geometryfield)
- ##5)new geometry
- self.addGeometry(self.output, self.geometryfield, geoparams, self.gistindexFlag)
- self.commitChanges()
- ##6)v.in.ogr
- self.importToGrass(self.output, self.geometryfield, geoparams, toDbf = self.dbfFlag, \
- overrideProj = self.overrideprojFlag)
- ##7)post-import operations
- self.commitChanges()
-
-class GrassPostGisImporterError(Exception):
- """Errors specific to GrassPostGisImporter class"""
- def __init__(self, message=''):
- self.details = '\nDetails:\n'
- exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
- self.details += repr(traceback.format_exception(exceptionType, exceptionValue, exceptionTraceback))
- self.message = message + "\n" + self.details
-
-def main():
- exitStatus = 0
- try:
- postgisImporter = GrassPostGisImporter(options, flags)
- postgisImporter.makeSqlImport()
- except GrassPostGisImporterError, e1:
- postgisImporter.printMessage(e1.message, type = 'error')
- exitStatus = 1
- except:
- exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
- errorMessage = "Unexpected error \n:" + \
- repr(traceback.format_exception(exceptionType, exceptionValue, exceptionTraceback))
- postgisImporter.printMessage(errorMessage, type = 'error')
- exitStatus = 1
- else:
- postgisImporter.printMessage("Done", type = 'info')
- finally:
- sys.exit(exitStatus)
-
-if __name__ == "__main__":
- ### DEBUG : uncomment to start local debugging session
- #brk(host="localhost", port=9000)
- options, flags = grass.parser()
- main()
-
Deleted: grass-addons/vector/v.in.postgis/v_in_postgis_tests.py
===================================================================
--- grass-addons/vector/v.in.postgis/v_in_postgis_tests.py 2009-07-03 16:44:22 UTC (rev 38193)
+++ grass-addons/vector/v.in.postgis/v_in_postgis_tests.py 2009-07-03 16:46:59 UTC (rev 38194)
@@ -1,311 +0,0 @@
-#!/usr/bin/python
-#-*- coding: utf-8 -*-
-#
-############################################################################
-#
-# MODULE: v_in_postgis_tests.py
-# AUTHOR(S): Mathieu Grelier, 2009 (greliermathieu at gmail.com)
-# PURPOSE: An attempt of a GRASS module unit test for v_in_postgis
-# COPYRIGHT: (C) 2009 Mathieu Grelier
-#
-# This program is free software under the GNU General Public
-# License (>=v2). Read the file COPYING that comes with GRASS
-# for details.
-#
-#############################################################################
-
-import sys
-import os
-import re
-
-##see http://trac.osgeo.org/grass/browser/grass/trunk/lib/python
-from grass import core as grass
-##only needed to use debugger with Komodo IDE. See http://aspn.activestate.com/ASPN/Downloads/Komodo/RemoteDebugging
-#from dbgp.client import brk
-##see http://pyunit.sourceforge.net/
-import unittest
-##see http://initd.org/pub/software/psycopg/
-import psycopg2 as dbapi2
-
-importer = None
-db = None
-cursor = None
-host = None
-dbname = None
-user = None
-pwd = None
-query = None
-testTableName = None
-queryTableName = None
-geometryField = None
-geoparams = None
-
-class v_in_postgis_tests(unittest.TestCase):
-
- def setUp(self):
- grass.run_command("db.connect", driver = 'pg', database = 'host=' + host + ",dbname=" + dbname)
-
- def tearDown(self):
- cleanUpQueryTable()
-
- def testGetDbInfos(self):
- """Test if the importer is able to retrieve correctly the parameters dictionnary for current connection."""
- self.assertEqual(importer.dbparams['host'],host)
- self.assertEqual(importer.dbparams['db'],dbname)
- self.assertEqual(importer.dbparams['user'],user)
- self.assertEqual(importer.dbparams['pwd'],pwd)
-
- def testCheckLayers(self):
- """Test if overwrite is working correctly"""
- from v_in_postgis import GrassPostGisImporterError
- os.environ['GRASS_OVERWRITE'] = '0'
- createQueryTable()
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.commitChanges()
- importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = False, overrideProj = True)
- importer.commitChanges()
- ##GRASS_OVERWRITE set to False, we can't import again the layer
- self.assertRaises(GrassPostGisImporterError, importer.checkLayers, queryTableName)
- cleanUpQueryTable()
- ##now set to True, it should be possible
- os.environ['GRASS_OVERWRITE'] = '1'
- createQueryTable()
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.commitChanges()
- importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = False, overrideProj = True)
- importer.commitChanges()
- try:
- importer.checkLayers(queryTableName)
- except Exception:
- self.fail("CheckLayers was expected to be successful with --o flag.")
- pass
-
- def testCheckComment(self):
- """Test that we can't drop a table with the output name if it was not created by the importer."""
- ##a table that was not tagged by the importer should not be overwritten
- from v_in_postgis import GrassPostGisImporterError
- os.environ['GRASS_OVERWRITE'] = '1'
- self.assertRaises(GrassPostGisImporterError, importer.checkComment, testTableName)
- pass
-
- def testImportGrassLayer(self):
- """Test import sequence result in GRASS"""
- ##preparation
- importer.query = query
- importer.output = queryTableName
- importer.geometryfield = geometryField
- importer.gistindexFlag = True
- importer.overrideprojFlag = True
- importer.dbfFlag = False
- ##PostGIS import
- importer.makeSqlImport()
- testOutput = grass.find_file(queryTableName, element = 'vector')
- self.assertTrue(testOutput['fullname'] != '')
- pgdbconnectstring = "host=" + host + ",dbname=" + dbname
- cmd = importer.executeCommand("v.db.connect", flags = 'g', map = queryTableName, driver = 'pg')
- r = re.compile(' ')
- dbinfos = r.split(cmd)
- def trim(p): return str(p).strip()
- dbinfos = map(trim, dbinfos)
- self.assertEquals(dbinfos[0], '1')
- self.assertEquals(dbinfos[1], queryTableName)
- self.assertEquals(dbinfos[2], 'cat')
- self.assertEquals(dbinfos[3], pgdbconnectstring)
- self.assertEquals(dbinfos[4], 'pg')
- ##cast is necessary
- cmd = importer.executeCommand("echo 'SELECT CAST(COUNT (*) AS int) FROM " + queryTableName + "' | db.select -c", shell = True)
- self.assertEquals(int(cmd[0]), 2)
- cleanUpQueryTable()
- #Dbf import
- importer.dbfFlag = True
- importer.makeSqlImport()
- testOutput = grass.find_file(queryTableName, element = 'vector')
- self.assertTrue(testOutput['fullname'] != '')
- env = grass.gisenv()
- dbfDbPath = os.path.join(env['GISDBASE'].strip(";'"), env['LOCATION_NAME'].strip(";'"), \
- env['MAPSET'].strip(";'"), 'dbf')
- cmd = importer.executeCommand("v.db.connect", flags = 'g', map = queryTableName, driver = 'dbf')
- r = re.compile(' ')
- dbinfos = r.split(cmd)
- dbinfos = map(trim, dbinfos)
- self.assertEquals(dbinfos[0], '1')
- self.assertEquals(dbinfos[1], queryTableName)
- self.assertEquals(dbinfos[2], 'cat')
- self.assertEquals(dbinfos[3], dbfDbPath)
- self.assertEquals(dbinfos[4], 'dbf')
- cmd = importer.executeCommand("db.select", flags = 'c', table = queryTableName, \
- database = dbfDbPath, driver = 'dbf')
- r = re.compile('\n')
- lines = r.split(cmd)
- def validrecord(l): return len(str(l).strip()) > 0
- result = filter(validrecord, lines)
- self.assertEquals(len(result), 2)
-
- def testGetGeometryInfos(self):
- """Test that we correctly retrieve geometry parameters from PostGis result table"""
- createQueryTable()
- params = importer.getGeometryInfo(queryTableName, geometryField)
- self.assertEquals(params['type'], geoparams['type'])
- self.assertEquals(params['ndims'], geoparams['ndims'])
- self.assertEquals(params['srid'], geoparams['srid'])
- ##needed
- importer.commitChanges()
-
- def testAddingCategoryWithPgDriverIsNecessary(self):
- """Test is the cat column addition is working and is still necessary with pg driver import
-
- cat column is necessary for GRASS to store categories.
- For now, the pg driver for v.in.ogr doesn't doesn't add automatically this
- cat column, whereas the dbf driver does. So the importer has a specific addCategory()
- method, which necessity is tested here.
- """
- ##starting with postgis to dbf import : no need to use the addCategory() method is expected
- from v_in_postgis import GrassPostGisImporterError
- createQueryTable()
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.commitChanges()
- importer.importToGrass(queryTableName, geometryField, geoparams, toDbf = True, overrideProj = True)
- importer.commitChanges()
- try:
- cmd = importer.executeCommand("v.univar", map = queryTableName, column = 'value')
- except GrassPostGisImporterError:
- self.fail("Categories should be retrieved with dbf driver.")
- cleanUpQueryTable()
- ##now same operations with pg driver : error is expected when GRASS use the cat column.
- createQueryTable()
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.commitChanges()
- importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
- importer.commitChanges()
- self.assertRaises(GrassPostGisImporterError, \
- importer.executeCommand, "v.univar", map = queryTableName, column = 'value')
- cleanUpQueryTable()
- ##now same operations with pg driver, after adding category column
- ##with the importer : error should not occur.
- createQueryTable()
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.addCategory(queryTableName)
- importer.commitChanges()
- importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
- importer.commitChanges()
- try:
- cmd = importer.executeCommand("v.univar", map = queryTableName, column = 'value')
- except GrassPostGisImporterError:
- self.fail("Categories should be retrieved with pg driver when categories are added.")
-
- def testGeometryDuplicationIsNecessary(self):
- """Test that we need to use the postGis' AddGeometryColumn function"""
- createQueryTable()
- importer.addCategory(queryTableName)
- importer.commitChanges()
- from v_in_postgis import GrassPostGisImporterError
- self.assertRaises(GrassPostGisImporterError, importer.importToGrass, queryTableName, \
- geometryField, geoparams, False, True)
- cleanUpQueryTable()
- createQueryTable()
- ##now addGeometry:
- importer.addGeometry(queryTableName, geometryField, geoparams, False)
- importer.addCategory(queryTableName)
- importer.commitChanges()
- try:
- importer.importToGrass(queryTableName, geometryField, geoparams, False, True)
- except Exception:
- self.fail("Both operations are for now expected to be necessary.")
- pass
-
-def createQueryTable():
- importer.createPostgresTableFromQuery(queryTableName, query)
- importer.commitChanges()
-
-def cleanUpQueryTable():
- db.rollback()
- try:
- importer.executeCommand("g.remove", vect = queryTableName, quiet = True)
- cursor.execute('DROP TABLE ' + queryTableName)
- except:
- pass
- try:
- cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + queryTableName + "'")
- except:
- pass
- db.commit()
-
-def cleanUp():
- db.rollback()
- cleanUpQueryTable()
- try:
- importer.executeCommand("g.remove", vect = testTableName, quiet = True)
- cursor.execute('DROP TABLE ' + testTableName)
- except:
- pass
- try:
- cursor.execute("DELETE FROM geometry_columns WHERE f_table_name = '" + testTableName + "'")
- except:
- pass
- db.commit()
-
-def suite():
- alltests = unittest.TestSuite()
- alltests.addTest(unittest.findTestCases(module))
- return alltests
-
-if __name__ == '__main__':
- ### DEBUG : uncomment to start local debugging session
- #brk(host="localhost", port=9000)
- currentDirectory = os.path.split(__file__)[0]
- sys.path.append(currentDirectory)
- from v_in_postgis import GrassPostGisImporter
- options = {'query':'', 'geometryfield':'', 'output':''}
- flags = {'d':0, 'z':0, 'o':0, 'g':0, 'l':0}
- importer = GrassPostGisImporter(options, flags)
- module = __import__('v_in_postgis_tests')
- ##test configuration
- host = 'localhost'
- dbname = 'yourdb'
- user = 'yourusername'
- pwd = 'yourpwd'
- db = dbapi2.connect(host=host, database=dbname, user=user, password=pwd)
- cursor = db.cursor()
- testTableName = 'test_grass_import'
- queryTableName = 'test_query'
- geometryField = 'the_geom'
- srid = '2154'
- geoparams = {'type':'MULTIPOLYGON', 'ndims':'2', 'srid': srid}
- query = 'select * from ' + testTableName + ' where id>1'
- ##give access to test elements to module
- module.importer = importer
- module.db = db
- module.cursor = cursor
- module.host = host
- module.dbname = dbname
- module.user = user
- module.pwd = pwd
- module.testTableName = testTableName
- module.queryTableName = queryTableName
- module.geometryField = geometryField
- module.geoparams = geoparams
- module.query = query
- ##test geo table
- cursor.execute("CREATE TABLE " + testTableName + " ( id int4, label varchar(20), value real )")
- cursor.execute("SELECT AddGeometryColumn('', '" + testTableName + "','" + geometryField + "'," + srid + ",'MULTIPOLYGON',2)")
- cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (1, 'A Geometry', 10, \
- GeomFromText('MULTIPOLYGON(((771825.9029201793 6880170.713342139,771861.2893824165 \
- 6880137.00894025,771853.633171779 6880129.128272728,771818.4675156211 \
- 6880162.7242448665,771825.9029201793 6880170.713342139)))', " + srid + "))")
- cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (2, 'Another Geometry', 20, \
- GeomFromText('MULTIPOLYGON(((771853.633171779 6880129.128272728,771842.2964842085 \
- 6880117.197908178,771807.1308239839 6880150.79394592,771818.4675156211 \
- 6880162.7242448665,771853.633171779 6880129.128272728)))', " + srid + "))")
- cursor.execute("INSERT INTO " + testTableName + " (id, label, value, " + geometryField + ") VALUES (3, 'A last Geometry', 20, \
- GeomFromText('MULTIPOLYGON(((771807.1308239839 6880150.79394592,771842.2964842085 \
- 6880117.197908178,771831.1791767566 6880105.270296125,771795.7209431691 \
- 6880138.862761175,771807.1308239839 6880150.79394592)))', " + srid + "))")
- cursor.execute("ALTER TABLE " + testTableName + " ADD CONSTRAINT test_pkey " + " PRIMARY KEY (id)")
- db.commit()
- os.environ['GRASS_VERBOSE'] = '0'
- try:
- unittest.main(defaultTest='suite')
- finally:
- module.cleanUp()
- sys.exit(0)
-
More information about the grass-commit
mailing list