[mapserver-commits] r12894 - in trunk/mapserver/mapcache: include src

svn at osgeo.org svn at osgeo.org
Tue Dec 13 12:17:46 EST 2011


Author: tbonfort
Date: 2011-12-13 09:17:46 -0800 (Tue, 13 Dec 2011)
New Revision: 12894

Modified:
   trunk/mapserver/mapcache/include/mapcache.h
   trunk/mapserver/mapcache/src/configuration_xml.c
   trunk/mapserver/mapcache/src/core.c
   trunk/mapserver/mapcache/src/fastcgi_mapcache.c
   trunk/mapserver/mapcache/src/mod_mapcache.c
   trunk/mapserver/mapcache/src/util.c
Log:
add multi-threaded fetching of tiles (potential speedups for unseeded wms requests)


Modified: trunk/mapserver/mapcache/include/mapcache.h
===================================================================
--- trunk/mapserver/mapcache/include/mapcache.h	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/include/mapcache.h	2011-12-13 17:17:46 UTC (rev 12894)
@@ -185,6 +185,7 @@
     void (*log)(mapcache_context *ctx, mapcache_log_level level, char *message, ...);
 
     const char* (*get_instance_id)(mapcache_context * ctx);
+    mapcache_context* (*clone)(mapcache_context *ctx);
     int has_threads;
     apr_pool_t *pool;
     char *_contenttype;
@@ -196,6 +197,7 @@
 };
 
 void mapcache_context_init(mapcache_context *ctx);
+void mapcache_context_copy(mapcache_context *src, mapcache_context *dst);
 
 #define GC_CHECK_ERROR_RETURN(ctx) (if(((mapcache_context*)ctx)->_errcode) return MAPCACHE_FAILURE;)
 #define GC_CHECK_ERROR(ctx) if(((mapcache_context*)ctx)->_errcode) return;
@@ -920,6 +922,8 @@
      * time in nanoseconds to wait before rechecking for lockfile presence
      */
     apr_interval_time_t lock_retry_interval; /* time in nanoseconds to wait before rechecking for lockfile presence */
+
+    int threaded_fetching;
     
     /**
      * the uri where the base of the service is mapped

Modified: trunk/mapserver/mapcache/src/configuration_xml.c
===================================================================
--- trunk/mapserver/mapcache/src/configuration_xml.c	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/src/configuration_xml.c	2011-12-13 17:17:46 UTC (rev 12894)
@@ -951,6 +951,15 @@
       }
    }
    
+   if((node = ezxml_child(doc,"threaded_fetching")) != NULL) {
+      if(!strcasecmp(node->txt,"true")) {
+         config->threaded_fetching = 1;
+      } else if(strcasecmp(node->txt,"false")) {
+         ctx->set_error(ctx, 400, "failed to parse threaded_fetching \"%s\". Expecting true or false",node->txt);
+         return;
+      }
+   }
+
    if((node = ezxml_child(doc,"log_level")) != NULL) {
       if(!strcasecmp(node->txt,"debug")) {
          config->loglevel = MAPCACHE_DEBUG;

Modified: trunk/mapserver/mapcache/src/core.c
===================================================================
--- trunk/mapserver/mapcache/src/core.c	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/src/core.c	2011-12-13 17:17:46 UTC (rev 12894)
@@ -30,8 +30,40 @@
 
 #include <apr_strings.h>
 #include "mapcache.h"
+#if APR_HAS_THREADS
+#include "apu_version.h"
 
+#if (APU_MAJOR_VERSION <= 1 && APU_MINOR_VERSION <= 3)
+ #define USE_THREADPOOL 0
+#else
+ #define USE_THREADPOOL 1
+#endif
 
+/* use a thread pool if using 1.3.12 or higher apu */
+#if !USE_THREADPOOL
+#include <apr_thread_proc.h>
+#else
+#include <apr_thread_pool.h>
+#endif
+ 
+typedef struct {
+   mapcache_tile *tile;
+   mapcache_context *ctx;
+   int launch;
+} _thread_tile;
+
+static void* APR_THREAD_FUNC _thread_get_tile(apr_thread_t *thread, void *data) {
+   _thread_tile* t = (_thread_tile*)data;
+   mapcache_tileset_tile_get(t->ctx, t->tile);
+#if !USE_THREADPOOL
+   apr_thread_exit(thread, APR_SUCCESS);
+#endif
+   return NULL;
+}
+
+#endif
+
+
 mapcache_http_response *mapcache_http_response_create(apr_pool_t *pool) {
    mapcache_http_response *response = (mapcache_http_response*) apr_pcalloc(pool,
          sizeof(mapcache_http_response));
@@ -41,6 +73,118 @@
    return response;
 }
 
+void mapcache_prefetch_tiles(mapcache_context *ctx, mapcache_tile **tiles, int ntiles) {
+
+#if !APR_HAS_THREADS
+   int i;
+   for(i=0;i<ntiles;i++) {
+      mapcache_tileset_tile_get(ctx, tiles[i]);
+      GC_CHECK_ERROR(ctx);
+   }
+#else
+   int i,rv;
+   if(ntiles==1 || ctx->config->threaded_fetching == 0) {
+   /* if threads disabled, or only fetching a single tile, don't launch a thread for the operation */
+      for(i=0;i<ntiles;i++) {
+         mapcache_tileset_tile_get(ctx, tiles[i]);
+         GC_CHECK_ERROR(ctx);
+      }
+      return;
+   }
+
+
+   /* allocate a thread struct for each tile. Not all will be used */
+   _thread_tile* thread_tiles = (_thread_tile*)apr_pcalloc(ctx->pool,ntiles*sizeof(_thread_tile));
+#if 1 || !USE_THREADPOOL
+   /* use multiple threads, to fetch from multiple metatiles and/or multiple tilesets */
+   apr_thread_t **threads;
+   apr_threadattr_t *thread_attrs;
+   apr_threadattr_create(&thread_attrs, ctx->pool);
+   threads = (apr_thread_t**)apr_pcalloc(ctx->pool, ntiles*sizeof(apr_thread_t*));
+   int nthreads = 0;
+   for(i=0;i<ntiles;i++) {
+      thread_tiles[i].tile = tiles[i];
+      thread_tiles[i].launch = 1;
+      int j=i-1;
+      /* 
+       * we only launch one thread per metatile as in the unseeded case the threads
+       * for a same metatile will lock while only a single thread launches the actual
+       * rendering request
+       */
+      while(j>=0) {
+         /* check that the given metatile hasn't been rendered yet */
+         if(thread_tiles[j].launch &&
+               (thread_tiles[i].tile->tileset == thread_tiles[j].tile->tileset) &&
+               (thread_tiles[i].tile->x / thread_tiles[i].tile->tileset->metasize_x  == 
+                  thread_tiles[j].tile->x / thread_tiles[j].tile->tileset->metasize_x)&&
+               (thread_tiles[i].tile->y / thread_tiles[i].tile->tileset->metasize_y  == 
+                  thread_tiles[j].tile->y / thread_tiles[j].tile->tileset->metasize_y)) {
+            thread_tiles[i].launch = 0; /* this tile will not have a thread spawned for it */
+            break;
+         }
+         j--;
+      }
+      if(thread_tiles[i].launch)
+         thread_tiles[i].ctx = ctx->clone(ctx);
+   }
+   for(i=0;i<ntiles;i++) {
+      if(!thread_tiles[i].launch) continue; /* skip tiles that have been marked */
+      rv = apr_thread_create(&threads[i], thread_attrs, _thread_get_tile, (void*)&(thread_tiles[i]), thread_tiles[i].ctx->pool);
+      if(rv != APR_SUCCESS) {
+         ctx->set_error(ctx,500, "failed to create thread %d of %d\n",i,ntiles);
+         break;
+      }
+      nthreads++;
+   }
+
+   /* wait for launched threads to finish */
+   for(i=0;i<ntiles;i++) {
+      if(!thread_tiles[i].launch) continue;
+      apr_thread_join(&rv, threads[i]);
+      if(rv != APR_SUCCESS) {
+         ctx->set_error(ctx,500, "thread %d of %d failed on exit\n",i,ntiles);
+      }
+      if(GC_HAS_ERROR(thread_tiles[i].ctx)) {
+         /* transfer error message from child thread to main context */
+         ctx->set_error(ctx,thread_tiles[i].ctx->get_error(thread_tiles[i].ctx),
+               thread_tiles[i].ctx->get_error_message(thread_tiles[i].ctx));
+      }
+   }
+   for(i=0;i<ntiles;i++) {
+      /* fetch the tiles that did not get a thread launched for them */
+      if(thread_tiles[i].launch) continue;
+      mapcache_tileset_tile_get(ctx, tiles[i]);
+   }
+#else
+    /* experimental version using a threadpool, disabled for stability reasons */
+   apr_thread_pool_t *thread_pool;
+   apr_thread_pool_create(&thread_pool,2,ctx->config->download_threads,ctx->pool);
+   for(i=0;i<ntiles;i++) {
+      ctx->log(ctx,MAPCACHE_DEBUG,"starting thread for tile %s",tiles[i]->tileset->name);
+      thread_tiles[i].tile = tiles[i];
+      thread_tiles[i].ctx = ctx->clone(ctx);
+      rv = apr_thread_pool_push(thread_pool,_thread_get_tile,(void*)&(thread_tiles[i]), 0,NULL);
+      if(rv != APR_SUCCESS) {
+         ctx->set_error(ctx,500, "failed to push thread %d of %d in thread pool\n",i,ntiles);
+         break;
+      }
+   }
+   GC_CHECK_ERROR(ctx);
+   while(apr_thread_pool_tasks_run_count(thread_pool) != ntiles || apr_thread_pool_busy_count(thread_pool)>0)
+      apr_sleep(10000);
+   apr_thread_pool_destroy(thread_pool);
+   for(i=0;i<ntiles;i++) {
+      if(GC_HAS_ERROR(thread_tiles[i].ctx)) {
+         ctx->set_error(ctx,thread_tiles[i].ctx->get_error(thread_tiles[i].ctx),
+               thread_tiles[i].ctx->get_error_message(thread_tiles[i].ctx));
+      }
+   }
+#endif
+   
+#endif
+
+}
+
 mapcache_http_response *mapcache_core_get_tile(mapcache_context *ctx, mapcache_request_get_tile *req_tile) {
   int expires = 0;
   mapcache_http_response *response;
@@ -59,11 +203,14 @@
    response = mapcache_http_response_create(ctx->pool);
   
 
+   mapcache_prefetch_tiles(ctx,req_tile->tiles,req_tile->ntiles);
+   if(GC_HAS_ERROR(ctx))
+      return NULL;
+
    /* this loop retrieves the tiles from the caches, and eventually decodes and merges them together
     * if multiple tiles were asked for */
    for(i=0;i<req_tile->ntiles;i++) {
       mapcache_tile *tile = req_tile->tiles[i];
-      mapcache_tileset_tile_get(ctx, tile);
       if(GC_HAS_ERROR(ctx))
          return NULL;
       if(i==0) {
@@ -141,34 +288,48 @@
    return response;
 }
 
+void mapcache_fetch_maps(mapcache_context *ctx, mapcache_map **maps, int nmaps, mapcache_resample_mode mode ) {
+   mapcache_tile ***maptiles;
+   int *nmaptiles;
+   mapcache_tile **tiles;
+   int ntiles = 0;
+   int i;
+   maptiles = apr_pcalloc(ctx->pool,nmaps*sizeof(mapcache_tile**));
+   nmaptiles = apr_pcalloc(ctx->pool,nmaps*sizeof(int));
+   for(i=0;i<nmaps;i++) {
+      mapcache_tileset_get_map_tiles(ctx,maps[i]->tileset,maps[i]->grid_link,
+            maps[i]->extent, maps[i]->width, maps[i]->height,
+            &(nmaptiles[i]), &(maptiles[i]));
+      ntiles += nmaptiles[i];
+   }
+   tiles = apr_pcalloc(ctx->pool,ntiles * sizeof(mapcache_tile*));
+   ntiles = 0;
+   for(i=0;i<nmaps;i++) {
+      int j;
+      for(j=0;j<nmaptiles[i];j++) {
+         tiles[ntiles] = maptiles[i][j];
+         tiles[ntiles]->dimensions = maps[i]->dimensions;
+         ntiles++;
+      }
+   }
+   mapcache_prefetch_tiles(ctx,tiles,ntiles);
+   for(i=0;i<nmaps;i++) {
+      int j;
+      for(j=0;j<nmaptiles[i];j++) {
+         /* update the map modification time if it is older than the tile mtime */
+         if(maptiles[i][j]->mtime>maps[i]->mtime) maps[i]->mtime = maptiles[i][j]->mtime;
 
-
-void _core_get_single_map(mapcache_context *ctx, mapcache_map *map, mapcache_resample_mode mode) {
-
-   mapcache_tile **maptiles;
-   int i,nmaptiles;
-   mapcache_tileset_get_map_tiles(ctx,map->tileset,map->grid_link,
-         map->extent, map->width, map->height,
-         &nmaptiles, &maptiles);
-   for(i=0;i<nmaptiles;i++) {
-      mapcache_tile *tile = maptiles[i];
-      tile->dimensions = map->dimensions;
-      mapcache_tileset_tile_get(ctx, tile);
-      GC_CHECK_ERROR(ctx);
-      
-      /* update the map modification time if it is older than the tile mtime */
-      if(tile->mtime>map->mtime) map->mtime = tile->mtime;
-
-      /* set the map expiration delay to the tile expiration delay,
-       * either if the map hasn't got an expiration delay yet
-       * or if the tile expiration is shorter than the map expiration
-       */
-      if(!map->expires || tile->expires<map->expires) map->expires = tile->expires;
+         /* set the map expiration delay to the tile expiration delay,
+          * either if the map hasn't got an expiration delay yet
+          * or if the tile expiration is shorter than the map expiration
+          */
+         if(!maps[i]->expires || maptiles[i][j]->expires<maps[i]->expires) maps[i]->expires =maptiles[i][j]->expires;
+      }
+      maps[i]->raw_image = mapcache_tileset_assemble_map_tiles(ctx,maps[i]->tileset,maps[i]->grid_link,
+            maps[i]->extent, maps[i]->width, maps[i]->height,
+            nmaptiles[i], maptiles[i],
+            mode);
    }
-   map->raw_image = mapcache_tileset_assemble_map_tiles(ctx,map->tileset,map->grid_link,
-         map->extent, map->width, map->height,
-         nmaptiles, maptiles,
-         mode);
 }
 
 mapcache_http_response *mapcache_core_get_map(mapcache_context *ctx, mapcache_request_get_map *req_map) {
@@ -192,15 +353,14 @@
    
    format = NULL;
    response = mapcache_http_response_create(ctx->pool);
-  basemap = req_map->maps[0];
+   basemap = req_map->maps[0];
 
    
    if(req_map->getmap_strategy == MAPCACHE_GETMAP_ASSEMBLE) {
-      _core_get_single_map(ctx,basemap,req_map->resample_mode);
+      mapcache_fetch_maps(ctx, req_map->maps, req_map->nmaps, req_map->resample_mode);
       if(GC_HAS_ERROR(ctx)) return NULL;
       for(i=1;i<req_map->nmaps;i++) {
          mapcache_map *overlaymap = req_map->maps[i];
-         _core_get_single_map(ctx,overlaymap,req_map->resample_mode); 
          if(GC_HAS_ERROR(ctx)) return NULL;
          mapcache_image_merge(ctx,basemap->raw_image,overlaymap->raw_image);
          if(GC_HAS_ERROR(ctx)) return NULL;

Modified: trunk/mapserver/mapcache/src/fastcgi_mapcache.c
===================================================================
--- trunk/mapserver/mapcache/src/fastcgi_mapcache.c	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/src/fastcgi_mapcache.c	2011-12-13 17:17:46 UTC (rev 12894)
@@ -68,10 +68,17 @@
 
 struct mapcache_context_fcgi {
    mapcache_context ctx;
-   char *mutex_fname;
-   apr_file_t *mutex_file;
 };
 
+static mapcache_context* fcgi_context_clone(mapcache_context *ctx) {
+   mapcache_context_fcgi *newctx = (mapcache_context_fcgi*)apr_pcalloc(ctx->pool,
+         sizeof(mapcache_context_fcgi));
+   mapcache_context *nctx = (mapcache_context*)newctx;
+   mapcache_context_copy(ctx,nctx);
+   apr_pool_create(&nctx->pool,ctx->pool);
+   return nctx;
+}
+
 static void fcgi_context_log(mapcache_context *c, mapcache_log_level level, char *message, ...) {
    va_list args;
    if(!c->config || level >= c->config->loglevel) {
@@ -94,7 +101,7 @@
    ctx->ctx.pool = global_pool;
    mapcache_context_init((mapcache_context*)ctx);
    ctx->ctx.log = fcgi_context_log;
-   ctx->mutex_fname="/tmp/mapcache.fcgi.lock";
+   ctx->ctx.clone = fcgi_context_clone;
    ctx->ctx.config = NULL;
    return ctx;
 }
@@ -314,6 +321,7 @@
    }
 #endif
    apr_pool_destroy(global_pool);
+   apr_terminate();
    return 0;
 
 }

Modified: trunk/mapserver/mapcache/src/mod_mapcache.c
===================================================================
--- trunk/mapserver/mapcache/src/mod_mapcache.c	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/src/mod_mapcache.c	2011-12-13 17:17:46 UTC (rev 12894)
@@ -144,9 +144,22 @@
    ap_log_rerror(APLOG_MARK, ap_log_level, 0, ctx->request, "%s", apr_pvsprintf(c->pool,message,args));
 }
 
+mapcache_context *mapcache_context_request_clone(mapcache_context *ctx) {
+   mapcache_context_apache_request *newctx = (mapcache_context_apache_request*)apr_pcalloc(ctx->pool,
+         sizeof(mapcache_context_apache_request));
+   mapcache_context *nctx = (mapcache_context*)newctx;
+   mapcache_context_copy(ctx,nctx);
+   //apr_pool_create(&nctx->pool,ctx->pool);
+   apr_pool_create(&nctx->pool,NULL);
+   apr_pool_cleanup_register(ctx->pool, nctx->pool,(void*)apr_pool_destroy, apr_pool_cleanup_null);
+   newctx->request = ((mapcache_context_apache_request*)ctx)->request;
+   return nctx;
+}
+
 void init_apache_request_context(mapcache_context_apache_request *ctx) {
    mapcache_context_init((mapcache_context*)ctx);
    ctx->ctx.ctx.log = apache_context_request_log;
+   ctx->ctx.ctx.clone = mapcache_context_request_clone;
 }
 
 void init_apache_server_context(mapcache_context_apache_server *ctx) {

Modified: trunk/mapserver/mapcache/src/util.c
===================================================================
--- trunk/mapserver/mapcache/src/util.c	2011-12-13 17:03:10 UTC (rev 12893)
+++ trunk/mapserver/mapcache/src/util.c	2011-12-13 17:17:46 UTC (rev 12894)
@@ -196,5 +196,25 @@
     ctx->clear_errors = _mapcache_context_clear_error_default;
 }
 
+void mapcache_context_copy(mapcache_context *src, mapcache_context *dst) {
+   dst->_contenttype = src->_contenttype;
+   dst->_errcode = src->_errcode;
+   dst->_errmsg = src->_errmsg;
+   dst->clear_errors = src->clear_errors;
+   dst->clone = src->clone;
+   dst->config = src->config;
+   dst->get_error = src->get_error;
+   dst->get_error_message = src->get_error_message;
+   dst->get_instance_id = src->get_instance_id;
+   dst->log = src->log;
+   dst->set_error = src->set_error;
+   dst->pool = src->pool;
+   dst->set_exception = src->set_exception;
+   dst->service = src->service;
+   dst->has_threads = src->has_threads;
+   dst->exceptions = src->exceptions;
+}
+
+
 /* vim: ai ts=3 sts=3 et sw=3
 */



More information about the mapserver-commits mailing list