[GRASS-SVN] r40696 - grass/trunk/raster/r.li/r.li.daemon

svn_grass at osgeo.org svn_grass at osgeo.org
Wed Jan 27 08:43:56 EST 2010


Author: glynn
Date: 2010-01-27 08:43:55 -0500 (Wed, 27 Jan 2010)
New Revision: 40696

Modified:
   grass/trunk/raster/r.li/r.li.daemon/daemon.c
   grass/trunk/raster/r.li/r.li.daemon/daemon.h
   grass/trunk/raster/r.li/r.li.daemon/ipc.c
   grass/trunk/raster/r.li/r.li.daemon/ipc.h
   grass/trunk/raster/r.li/r.li.daemon/worker.c
Log:
Eliminate client-server framework


Modified: grass/trunk/raster/r.li/r.li.daemon/daemon.c
===================================================================
--- grass/trunk/raster/r.li/r.li.daemon/daemon.c	2010-01-27 10:44:09 UTC (rev 40695)
+++ grass/trunk/raster/r.li/r.li.daemon/daemon.c	2010-01-27 13:43:55 UTC (rev 40696)
@@ -41,13 +41,11 @@
 {
 
     char pathSetup[GPATH_MAX], out[GPATH_MAX], parsed;
-    char *reportChannelName, *random_access_name;
+    char *random_access_name;
     struct History history;
     g_areas g;
-    int receiveChannel;
     int res;
-    wd child[WORKERS];
-    int i, mypid, doneDir, withoutJob, mv_fd, random_access;
+    int i, doneDir, mv_fd, random_access;
 
     /*int mv_rows, mv_cols; */
     list l;
@@ -57,48 +55,9 @@
 
     g = (g_areas) G_malloc(sizeof(struct generatore));
     l = (list) G_malloc(sizeof(struct lista));
-    mypid = getpid();
 
-    /* create report pipe */
-    reportChannelName = G_tempfile();
-    if (mkfifo(reportChannelName, 0644) == -1)
-	G_fatal_error("Error in pipe creation");
+    worker_init(raster, f, parameters);
 
-
-    /*###############################################
-       --------------create childs-------------------
-       ############################################### */
-
-    i = 0;
-    while (i < WORKERS) {
-	int childpid;
-
-	/*creating pipe */
-	child[i].pipe = G_tempfile();
-	if (mkfifo(child[i].pipe, 0755) == -1)
-	    G_fatal_error(_("Error in pipe creation"));
-	childpid = fork();
-	if (childpid) {
-	    /*father process */
-	    child[i].pid = childpid;
-	    child[i].channel = open(child[i].pipe, O_WRONLY | O_CREAT, 0755);
-
-	    if (child[i].channel == -1) {
-		G_fatal_error(_("Error opening channel %i"), i);
-	    }
-	    i++;
-	}
-	else {
-	    /*child process */
-	    worker(raster, f, reportChannelName, child[i].pipe, parameters);
-	    exit(0);
-	}
-    }
-
-    /*open reportChannel */
-    receiveChannel = open(reportChannelName, O_RDONLY, 0755);
-
-
     /*########################################################      
        -----------------create area queue----------------------
        ######################################################### */
@@ -110,7 +69,7 @@
 	file += strlen(testpath);
 
     /* TODO: check if this path is portable */
-/* TODO: use G_rc_path() */
+    /* TODO: use G_rc_path() */
     sprintf(pathSetup, "%s/.grass7/r.li/%s", G_home(), file);
     G_debug(1, "r.li.daemon pathSetup: [%s]", pathSetup);
     parsed = parseSetup(pathSetup, l, g, raster);
@@ -161,23 +120,17 @@
        ------------------analysis loop----------------------
        ####################################################### */
     /*first job scheduling */
-    while ((i < WORKERS) && next_Area(parsed, l, g, &m) != 0) {
-	send(child[i].channel, &m);
-	i++;
-    }
+    if (next_Area(parsed, l, g, &m) != 0)
 
-
     /*body */
     while (next_Area(parsed, l, g, &m) != 0) {
-	int j = 0, donePid;
+	worker_process(&doneJob, &m);
 
-	receive(receiveChannel, &doneJob);
 	/*perc++; */
 	/*G_percent (perc, WORKERS, 1); */
 	if (doneJob.type == DONE) {
 	    double result;
 
-	    donePid = doneJob.f.f_d.pid;
 	    result = doneJob.f.f_d.res;
 	    /*output */
 	    if (parsed != MVWIN) {
@@ -190,7 +143,6 @@
 	    }
 	}
 	else {
-	    donePid = doneJob.f.f_e.pid;
 	    if (parsed != MVWIN) {
 		error_Output(res, doneJob);
 	    }
@@ -198,97 +150,10 @@
 		/*printf("todo ");fflush(stdout); *//* TODO scrivere su raster NULL ??? */
 	    }
 	}
-	j = 0;
-
-
-	while (j < WORKERS && donePid != child[j].pid)
-	    j++;
-	send(child[j].channel, &m);
-
     }
 
-    /*kill childs */
-    withoutJob = i;
-    while (i > 0) {
-	int j = 0, donePid, status;
+    worker_end();
 
-	receive(receiveChannel, &doneJob);
-	if (doneJob.type == DONE) {
-	    double result;
-
-	    donePid = doneJob.f.f_d.pid;
-	    result = doneJob.f.f_d.res;
-	    if (parsed != MVWIN) {
-		print_Output(res, doneJob);
-	    }
-	    else {
-		/* raster */
-		raster_Output(random_access, doneJob.f.f_d.aid, g,
-			      doneJob.f.f_d.res);
-	    }
-	}
-	else {
-	    donePid = doneJob.f.f_e.pid;
-	    if (parsed != MVWIN) {
-		error_Output(res, doneJob);
-	    }
-	    else {
-		/*printf("todo2 ");fflush(stdout); *//*TODO scrivere su raster */
-	    }
-	}
-
-	i--;
-
-	while (j < WORKERS && donePid != child[j].pid)
-	    j++;
-
-	m.type = TERM;
-	m.f.f_t.pid = mypid;
-	send(child[j].channel, &m);
-	wait(&status);
-
-	if (!(WIFEXITED(status)))
-	    G_warning(
-		    _("r.li.worker (pid %i) exited with abnormal status: %i"),
-		    donePid, status);
-	else
-	    G_verbose_message(
-		    _("r.li.worker (pid %i) terminated successfully"),
-		    donePid);
-
-	/* remove pipe */
-	if (close(child[j].channel) != 0)
-	    G_message(_("Cannot close %s file (PIPE)"), child[j].pipe);
-	if (unlink(child[j].pipe) != 0)
-	    G_message(_("Cannot delete %s file (PIPE)"), child[j].pipe);
-    }
-
-    /* kill children without Job */
-    for (i = withoutJob; i < WORKERS; i++) {
-	int status;
-
-	m.type = TERM;
-	m.f.f_t.pid = mypid;
-	send(child[i].channel, &m);
-	wait(&status);
-
-	if (!(WIFEXITED(status)))
-	    G_warning(
-		    _("r.li.worker (pid %i) exited with abnormal status: %i"),
-		    child[i].pid, status);
-	else
-	    G_verbose_message(
-		    _("r.li.worker (pid %i) terminated successfully"),
-		    child[i].pid);
-
-	/* remove pipe */
-	if (close(child[i].channel) != 0)
-	    G_message(_("Cannot close %s file (PIPE2)"), child[i].pipe);
-	if (unlink(child[i].pipe) != 0)
-	    G_message(_("Cannot delete %s file (PIPE2)"), child[i].pipe);
-    }
-
-
     /*################################################
        --------------delete tmp files------------------
        ################################################ */
@@ -303,12 +168,6 @@
 	Rast_write_history(output, &history);
     }
 
-    if (close(receiveChannel) != 0)
-	G_message(_("Cannot close receive channel file"));
-
-    if (unlink(reportChannelName) != 0)
-	G_message(_("Cannot delete %s file"), child[i].pipe);
-
     return 1;
 }
 
@@ -539,7 +398,7 @@
 	    G_fatal_error(_("Too many units to place"));
 	assigned = G_malloc(units * sizeof(int));
 	i = 0;
-	srandom(getpid());
+	srandom(0);
 	while (i < units) {
 	    int j, position, found = FALSE;
 
@@ -607,7 +466,7 @@
 	if (r_strat_len < g->rl || c_strat_len < g->cl)
 	    G_fatal_error(_("Too many strats for raster map"));
 	loop = r_strat * c_strat;
-	srandom(getpid());
+	srandom(0);
 	for (i = 0; i < loop; i++) {
 	    msg m;
 

Modified: grass/trunk/raster/r.li/r.li.daemon/daemon.h
===================================================================
--- grass/trunk/raster/r.li/r.li.daemon/daemon.h	2010-01-27 10:44:09 UTC (rev 40695)
+++ grass/trunk/raster/r.li/r.li.daemon/daemon.h	2010-01-27 13:43:55 UTC (rev 40696)
@@ -25,7 +25,6 @@
 /**
  * \brief number of r.li.workers to use
  */
-#define WORKERS 10
 #define NORMAL 1
 #define MVWIN 2
 #define GEN 3
@@ -200,12 +199,12 @@
  * \brief client implementation
  * \param raster the raster map to analyze
  * \param f the function used for index computing
- * \param server_channel the channel where to send the result
- * \param mychannel the channel where to receive the area messages
  * \param result where to put the result of index computing
  */
-void worker(char *raster, int f(int, char **, area_des, double *),
-	    char *server_channel, char *mychannel, char **parameters);
+void worker_init(char *raster, int f(int, char **, area_des, double *),
+		 char **parameters);
+void worker_process(msg *ret, msg *m);
+void worker_end(void);
 
  /**
   * \brief adapts the mask at current raster file

Modified: grass/trunk/raster/r.li/r.li.daemon/ipc.c
===================================================================
--- grass/trunk/raster/r.li/r.li.daemon/ipc.c	2010-01-27 10:44:09 UTC (rev 40695)
+++ grass/trunk/raster/r.li/r.li.daemon/ipc.c	2010-01-27 13:43:55 UTC (rev 40696)
@@ -1,86 +0,0 @@
-
-/**
- * \file IPC.c
- *
- * \brief implementation of interprocess communication
- *	primitives between r.li.daemon and r.li.worker
- *
- *
- * This program is free software under the GPL (>=v2)
- * Read the COPYING file that comes with GRASS for details.
- *
- *
- * \author Lucio Davide Spano
- * 
- * \version 1.0
- * 
- */
-
-
-#include <unistd.h>
-#include <grass/gis.h>
-#include <grass/glocale.h>
-#include "ipc.h"
-
-
-int send(int pipe, msg * m)
-{
-    int check;
-
-    /* write on pipe */
-    check = write(pipe, m, sizeof(msg));
-    if (check > 0)
-	return 1;
-    else
-	return 0;
-}
-
-int receive(int pipe, msg * m)
-{
-    return read(pipe, m, sizeof(msg));
-}
-
-void printMsg(msg m)
-{
-
-    switch (m.type) {
-    case AREA:{
-	    G_message(_("				AREA MESSAGE: \n \
-				aid = %i \n \
-				x = %i \n \
-				y = %i \n \
-				rl = %i \n \
-				cl = %i \n "), m.f.f_a.aid, m.f.f_a.x, m.f.f_a.y, m.f.f_a.rl, m.f.f_a.cl);
-	}
-	break;
-    case MASKEDAREA:{
-	    G_message(_(" 				MASKEDAREA MESSAGE: \n \
-				aid = %i \n \
-				x = %i \n \
-				y = %i \n \
-				rl = %i \n \
-				cl = %i \n \
-				mask = %s \n "),
-		      m.f.f_ma.aid, m.f.f_ma.x, m.f.f_ma.y, m.f.f_ma.rl, m.f.f_ma.cl, m.f.f_ma.mask);
-	}
-	break;
-    case DONE:{
-	    G_message(_(" 				DONE MESSAGE: \n \
-				aid = %i \n \
-				pid = %i \n \
-				result = %f \n "), m.f.f_d.aid, m.f.f_d.pid, m.f.f_d.res);
-	}
-	break;
-    case ERROR:{
-	    G_message(_(" 				ERROR MESSAGE: \n \
-				aid = %i \n \
-				pid = %i \n "), m.f.f_e.aid, m.f.f_e.pid);
-	}
-	break;
-    case TERM:{
-	    G_message(_(" 				TERM MESSAGE: \n \
-				pid = %i \n "), m.f.f_t.pid);
-	}
-	break;
-    }
-}

Modified: grass/trunk/raster/r.li/r.li.daemon/ipc.h
===================================================================
--- grass/trunk/raster/r.li/r.li.daemon/ipc.h	2010-01-27 10:44:09 UTC (rev 40695)
+++ grass/trunk/raster/r.li/r.li.daemon/ipc.h	2010-01-27 13:43:55 UTC (rev 40696)
@@ -120,23 +120,3 @@
     fields f;
 } msg;
 
-/**
- * \brief send the specified message through the pipe channel.
- * \param pipe the pipe used to send message
- * \param m the message to send
- * \return 1 message sent, 0 otherwise
- */
-int send(int pipe, msg * m);
-
-/**
- * \brief receive a message through the pipe channel
- * \param pipe the pipe used to receive message
- * \param m the message to receive
- * \return 1 message sent, 0 otherwise
- */
-int receive(int pipe, msg * m);
-
-/**
- * \brief debug function, print a message to STDOUT
- */
-void printMsg(msg m);

Modified: grass/trunk/raster/r.li/r.li.daemon/worker.c
===================================================================
--- grass/trunk/raster/r.li/r.li.daemon/worker.c	2010-01-27 10:44:09 UTC (rev 40695)
+++ grass/trunk/raster/r.li/r.li.daemon/worker.c	2010-01-27 13:43:55 UTC (rev 40696)
@@ -38,27 +38,30 @@
 
 #define CACHESIZE 4194304
 
-void worker(char *raster, int f(int, char **, area_des, double *),
-	    char *server_channel, char *mychannel, char **parameters)
-{
-    int fd, aid;
-    int rec_ch, send_ch, erease_mask = 0, data_type = 0;
-    int cache_rows, used = 0;
-    msg toReceive, toSend;
-    area_des ad;
-    double result;
-    int pid;
-    struct Cell_head hd;
-    cell_manager cm;
-    dcell_manager dm;
-    fcell_manager fm;
+static int fd, aid;
+static int erease_mask = 0, data_type = 0;
+static int cache_rows, used = 0;
+static area_des ad;
+static double result;
+static struct Cell_head hd;
+static cell_manager cm;
+static dcell_manager dm;
+static fcell_manager fm;
+static char *raster;
+static char **parameters;
+static int (*func)(int, char **, area_des, double *);
 
+void worker_init(char *r, int f(int, char **, area_des, double *), char **p)
+{
     cm = G_malloc(sizeof(struct cell_memory_entry));
     fm = G_malloc(sizeof(struct fcell_memory_entry));
     dm = G_malloc(sizeof(struct dcell_memory_entry));
-    pid = getpid();
     ad = G_malloc(sizeof(struct area_entry));
 
+    raster = r;
+    parameters = p;
+    func = f;
+
     /* open raster map */
     fd = Rast_open_old(raster, "");
     Rast_get_cellhd(raster, "", &hd);
@@ -68,154 +71,134 @@
     /* calculate rows in cache */
     switch (data_type) {
     case CELL_TYPE:{
-	    cache_rows = CACHESIZE / (hd.cols * sizeof(CELL));
-	    cm->cache = G_malloc(cache_rows * sizeof(CELL *));
-	    cm->contents = G_malloc(cache_rows * sizeof(int));
-	    cm->used = 0;
-	    cm->contents[0] = -1;
-	} break;
+	cache_rows = CACHESIZE / (hd.cols * sizeof(CELL));
+	cm->cache = G_malloc(cache_rows * sizeof(CELL *));
+	cm->contents = G_malloc(cache_rows * sizeof(int));
+	cm->used = 0;
+	cm->contents[0] = -1;
+    } break;
     case DCELL_TYPE:{
-	    cache_rows = CACHESIZE / (hd.cols * sizeof(DCELL));
-	    dm->cache = G_malloc(cache_rows * sizeof(DCELL *));
-	    dm->contents = G_malloc(cache_rows * sizeof(int));
-	    dm->used = 0;
-	    dm->contents[0] = -1;
-	} break;
+	cache_rows = CACHESIZE / (hd.cols * sizeof(DCELL));
+	dm->cache = G_malloc(cache_rows * sizeof(DCELL *));
+	dm->contents = G_malloc(cache_rows * sizeof(int));
+	dm->used = 0;
+	dm->contents[0] = -1;
+    } break;
     case FCELL_TYPE:{
-	    cache_rows = CACHESIZE / (hd.cols * sizeof(FCELL));
-	    fm->cache = G_malloc(cache_rows * sizeof(FCELL *));
-	    fm->contents = G_malloc(cache_rows * sizeof(int));
-	    fm->used = 0;
-	    fm->contents[0] = -1;
-	} break;
+	cache_rows = CACHESIZE / (hd.cols * sizeof(FCELL));
+	fm->cache = G_malloc(cache_rows * sizeof(FCELL *));
+	fm->contents = G_malloc(cache_rows * sizeof(int));
+	fm->used = 0;
+	fm->contents[0] = -1;
+    } break;
     }
     ad->data_type = data_type;
     ad->cm = cm;
     ad->fm = fm;
     ad->dm = dm;
+}
 
-    /* open receive channel */
-    rec_ch = open(mychannel, O_RDONLY, 0755);
-    if (rec_ch == -1) {
-	G_message(_("CHILD[pid = %i] cannot open receive channel"), pid);
-	exit(0);
-    }
+void worker_process(msg *ret, msg *m)
+{
+    switch (m->type) {
+    case AREA:
+	aid = m->f.f_a.aid;
+	ad->x = m->f.f_a.x;
+	ad->y = m->f.f_a.y;
+	ad->rl = m->f.f_a.rl;
+	ad->cl = m->f.f_a.cl;
+	ad->raster = raster;
+	ad->mask = -1;
+	break;
+    case MASKEDAREA:
+	aid = m->f.f_ma.aid;
+	ad->x = m->f.f_ma.x;
+	ad->y = m->f.f_ma.y;
+	ad->rl = m->f.f_ma.rl;
+	ad->cl = m->f.f_ma.cl;
+	ad->raster = raster;
 
-    /* open send channel */
-    send_ch = open(server_channel, O_WRONLY, 0755);
-    if (send_ch == -1) {
-	G_message(_("CHILD[pid = %i] cannot open receive channel"), pid);
-	exit(0);
-    }
-
-    /* receive loop */
-    receive(rec_ch, &toReceive);
-
-    while (toReceive.type != TERM) {
-	if (toReceive.type == AREA) {
-	    aid = toReceive.f.f_ma.aid;
-	    ad->x = toReceive.f.f_a.x;
-	    ad->y = toReceive.f.f_a.y;
-	    ad->rl = toReceive.f.f_a.rl;
-	    ad->cl = toReceive.f.f_a.cl;
-	    ad->raster = raster;
+	/* mask preprocessing */
+	ad->mask_name = mask_preprocessing(m->f.f_ma.mask,
+					   raster, ad->rl, ad->cl);
+	if (ad->mask_name == NULL) {
+	    G_message(_("unable to open <%s> mask ... continuing without!"),
+		      m->f.f_ma.mask);
 	    ad->mask = -1;
 	}
-	else if (toReceive.type == MASKEDAREA) {
-	    aid = toReceive.f.f_ma.aid;
-	    ad->x = toReceive.f.f_ma.x;
-	    ad->y = toReceive.f.f_ma.y;
-	    ad->rl = toReceive.f.f_ma.rl;
-	    ad->cl = toReceive.f.f_ma.cl;
-	    ad->raster = raster;
-
-	    /* mask preprocessing */
-	    ad->mask_name = mask_preprocessing(toReceive.f.f_ma.mask,
-					       raster, ad->rl, ad->cl);
-	    if (ad->mask_name == NULL) {
-		G_message(_("CHILD[pid = %i]: unable to open <%s> mask ... continuing without!"),
-			  pid, toReceive.f.f_ma.mask);
-		ad->mask = -1;
+	else {
+	    if (strcmp(m->f.f_ma.mask, ad->mask_name) != 0)
+		/* temporary mask created */
+		erease_mask = 1;
+	    ad->mask = open(ad->mask_name, O_WRONLY, 0755);
+	    if (ad->mask == -1) {
+		G_message(_("unable to open <%s> mask ... continuing without!"),
+			  m->f.f_ma.mask);
 	    }
-	    else {
-		if (strcmp(toReceive.f.f_ma.mask, ad->mask_name) != 0)
-		    /* temporary mask created */
-		    erease_mask = 1;
-		ad->mask = open(ad->mask_name, O_WRONLY, 0755);
-		if (ad->mask == -1) {
-		    G_message(_("CHILD[pid = %i]: unable to open <%s> mask ... continuing without!"),
-			      pid, toReceive.f.f_ma.mask);
-		}
-	    }
 	}
-	else
-	    G_fatal_error("Program error, worker() toReceive.type=%d",
-			  toReceive.type);
+	break;
+    default:
+	G_fatal_error("Program error, worker() type=%d", m->type);
+	break;
+    }
 
-	/* memory menagement */
-	if (ad->rl > used) {
-	    /* allocate cache */
-	    int i;
+    /* memory menagement */
+    if (ad->rl > used) {
+	/* allocate cache */
+	int i;
 
-	    switch (data_type) {
-	    case CELL_TYPE:{
-		    for (i = 0; i < (ad->rl - used); i++) {
-			cm->cache[used + i] = Rast_allocate_c_buf();
-		    }
-		}
-		break;
-	    case DCELL_TYPE:{
-		    for (i = 0; i < ad->rl - used; i++) {
-			dm->cache[used + i] = Rast_allocate_d_buf();
-		    }
-		}
-		break;
-	    case FCELL_TYPE:{
-		    for (i = 0; i < ad->rl - used; i++) {
-			fm->cache[used + i] = Rast_allocate_f_buf();
-		    }
-		}
-		break;
+	switch (data_type) {
+	case CELL_TYPE:{
+	    for (i = 0; i < (ad->rl - used); i++) {
+		cm->cache[used + i] = Rast_allocate_c_buf();
 	    }
-	    cm->used = ad->rl;
-	    dm->used = ad->rl;
-	    fm->used = ad->rl;
-	    used = ad->rl;
 	}
-
-	/* calculate function */
-
-	if (f(fd, parameters, ad, &result) == RLI_OK) {
-	    /* success */
-	    toSend.type = DONE;
-	    toSend.f.f_d.aid = aid;
-	    toSend.f.f_d.pid = getpid();
-	    toSend.f.f_d.res = result;
+	    break;
+	case DCELL_TYPE:{
+	    for (i = 0; i < ad->rl - used; i++) {
+		dm->cache[used + i] = Rast_allocate_d_buf();
+	    }
 	}
-	else {
-	    /* fail */
-	    toSend.type = ERROR;
-	    toSend.f.f_e.aid = aid;
-	    toSend.f.f_e.pid = getpid();
+	    break;
+	case FCELL_TYPE:{
+	    for (i = 0; i < ad->rl - used; i++) {
+		fm->cache[used + i] = Rast_allocate_f_buf();
+	    }
 	}
+	    break;
+	}
+	cm->used = ad->rl;
+	dm->used = ad->rl;
+	fm->used = ad->rl;
+	used = ad->rl;
+    }
 
-	send(send_ch, &toSend);
+    /* calculate function */
 
-	if (erease_mask == 1) {
-	    erease_mask = 0;
-	    unlink(ad->mask_name);
-	}
+    if (func(fd, parameters, ad, &result) == RLI_OK) {
+	/* success */
+	ret->type = DONE;
+	ret->f.f_d.aid = aid;
+	ret->f.f_d.pid = 0;
+	ret->f.f_d.res = result;
+    }
+    else {
+	/* fail */
+	ret->type = ERROR;
+	ret->f.f_e.aid = aid;
+	ret->f.f_e.pid = 0;
+    }
 
-	receive(rec_ch, &toReceive);
+    if (erease_mask == 1) {
+	erease_mask = 0;
+	unlink(ad->mask_name);
     }
+}
+
+void worker_end(void)
+{
     /* close raster map */
     Rast_close(fd);
-
-    /* close channels */
-    close(rec_ch);
-    close(send_ch);
-
-    return;
 }
 
 char *mask_preprocessing(char *mask, char *raster, int rl, int cl)



More information about the grass-commit mailing list