[postgis-tickets] r16105 - Parallel implementation of ST_AsMVT (Closes #3927)

Paul Ramsey pramsey at cleverelephant.ca
Thu Nov 9 09:21:45 PST 2017


Author: pramsey
Date: 2017-11-09 09:21:45 -0800 (Thu, 09 Nov 2017)
New Revision: 16105

Modified:
   trunk/NEWS
   trunk/postgis/lwgeom_out_mvt.c
   trunk/postgis/mvt.c
   trunk/postgis/mvt.h
   trunk/postgis/postgis.sql.in
Log:
Parallel implementation of ST_AsMVT (Closes #3927)


Modified: trunk/NEWS
===================================================================
--- trunk/NEWS	2017-11-09 16:27:26 UTC (rev 16104)
+++ trunk/NEWS	2017-11-09 17:21:45 UTC (rev 16105)
@@ -13,6 +13,7 @@
            with core PostGIS functions.
 
 * Enhancements and Fixes*
+  - #3927, Parallel implementation of ST_AsMVT
   - #3925, Simplify geometry using map grid cell size before generating MVT
   - #3899, BTree sort order is now defined on collections of EMPTY and
            same-prefix geometries (Darafei Praliaskouski)

Modified: trunk/postgis/lwgeom_out_mvt.c
===================================================================
--- trunk/postgis/lwgeom_out_mvt.c	2017-11-09 16:27:26 UTC (rev 16104)
+++ trunk/postgis/lwgeom_out_mvt.c	2017-11-09 17:21:45 UTC (rev 16105)
@@ -55,7 +55,7 @@
 	geom_in = PG_GETARG_GSERIALIZED_P_COPY(0);
 	lwgeom_in = lwgeom_from_gserialized(geom_in);
 	if (PG_ARGISNULL(1))
-		elog(ERROR, "ST_AsMVTGeom: parameter bounds cannot be null");
+		elog(ERROR, "%s: parameter bounds cannot be null", __func__);
 	bounds = (GBOX *) PG_GETARG_POINTER(1);
 	extent = PG_ARGISNULL(2) ? 4096 : PG_GETARG_INT32(2);
 	buffer = PG_ARGISNULL(3) ? 256 : PG_GETARG_INT32(3);
@@ -83,10 +83,10 @@
 	PG_RETURN_NULL();
 #else
 	MemoryContext aggcontext;
-	struct mvt_agg_context *ctx;
+	mvt_agg_context *ctx;
 
 	if (!AggCheckCallContext(fcinfo, &aggcontext))
-		elog(ERROR, "pgis_asmvt_transfn: called in non-aggregate context");
+		elog(ERROR, "%s called in non-aggregate context", __func__);
 	MemoryContextSwitchTo(aggcontext);
 
 	if (PG_ARGISNULL(0)) {
@@ -102,11 +102,11 @@
 			ctx->geom_name = text_to_cstring(PG_GETARG_TEXT_P(4));
 		mvt_agg_init_context(ctx);
 	} else {
-		ctx = (struct mvt_agg_context *) PG_GETARG_POINTER(0);
+		ctx = (mvt_agg_context *) PG_GETARG_POINTER(0);
 	}
 
 	if (!type_is_rowtype(get_fn_expr_argtype(fcinfo->flinfo, 1)))
-		elog(ERROR, "pgis_asmvt_transfn: parameter row cannot be other than a rowtype");
+		elog(ERROR, "%s: parameter row cannot be other than a rowtype", __func__);
 	ctx->row = PG_GETARG_HEAPTUPLEHEADER(1);
 
 	mvt_agg_transfn(ctx);
@@ -125,10 +125,11 @@
 	elog(ERROR, "Missing libprotobuf-c");
 	PG_RETURN_NULL();
 #else
-	struct mvt_agg_context *ctx;
-	uint8_t *buf;
+	mvt_agg_context *ctx;
+	bytea *buf;
+	elog(DEBUG2, "%s called", __func__);
 	if (!AggCheckCallContext(fcinfo, NULL))
-		elog(ERROR, "pgis_asmvt_finalfn called in non-aggregate context");
+		elog(ERROR, "%s called in non-aggregate context", __func__);
 
 	if (PG_ARGISNULL(0))
 	{
@@ -137,8 +138,76 @@
 		PG_RETURN_BYTEA_P(emptybuf);
 	}
 
-	ctx = (struct mvt_agg_context *) PG_GETARG_POINTER(0);
+	ctx = (mvt_agg_context *) PG_GETARG_POINTER(0);
 	buf = mvt_agg_finalfn(ctx);
 	PG_RETURN_BYTEA_P(buf);
 #endif
 }
+
+PG_FUNCTION_INFO_V1(pgis_asmvt_serialfn);
+Datum pgis_asmvt_serialfn(PG_FUNCTION_ARGS)
+{
+#ifndef HAVE_LIBPROTOBUF
+	elog(ERROR, "Missing libprotobuf-c");
+	PG_RETURN_NULL();
+#else
+	mvt_agg_context *ctx;
+	elog(DEBUG2, "%s called", __func__);
+	if (!AggCheckCallContext(fcinfo, NULL))
+		elog(ERROR, "%s called in non-aggregate context", __func__);
+
+	if (PG_ARGISNULL(0))
+	{
+		bytea *emptybuf = palloc(VARHDRSZ);
+		SET_VARSIZE(emptybuf, VARHDRSZ);
+		PG_RETURN_BYTEA_P(emptybuf);
+	}
+
+	ctx = (mvt_agg_context *) PG_GETARG_POINTER(0);
+	PG_RETURN_BYTEA_P(mvt_ctx_serialize(ctx));
+#endif
+}
+
+
+PG_FUNCTION_INFO_V1(pgis_asmvt_deserialfn);
+Datum pgis_asmvt_deserialfn(PG_FUNCTION_ARGS)
+{
+#ifndef HAVE_LIBPROTOBUF
+	elog(ERROR, "Missing libprotobuf-c");
+	PG_RETURN_NULL();
+#else
+	MemoryContext aggcontext, oldcontext;
+	elog(DEBUG2, "%s called", __func__);
+	if (!AggCheckCallContext(fcinfo, &aggcontext))
+		elog(ERROR, "%s called in non-aggregate context", __func__);
+
+	oldcontext = MemoryContextSwitchTo(aggcontext);
+	mvt_agg_context *ctx = mvt_ctx_deserialize(PG_GETARG_BYTEA_P(0));
+	MemoryContextSwitchTo(oldcontext);
+
+	PG_RETURN_POINTER(ctx);
+#endif
+}
+
+PG_FUNCTION_INFO_V1(pgis_asmvt_combinefn);
+Datum pgis_asmvt_combinefn(PG_FUNCTION_ARGS)
+{
+#ifndef HAVE_LIBPROTOBUF
+	elog(ERROR, "Missing libprotobuf-c");
+	PG_RETURN_NULL();
+#else
+	MemoryContext aggcontext, oldcontext;
+	elog(DEBUG2, "%s called", __func__);
+	if (!AggCheckCallContext(fcinfo, &aggcontext))
+		elog(ERROR, "%s called in non-aggregate context", __func__);
+
+	mvt_agg_context *ctx, *ctx1, *ctx2;
+	ctx1 = (mvt_agg_context*)PG_GETARG_POINTER(0);
+	ctx2 = (mvt_agg_context*)PG_GETARG_POINTER(1);
+	oldcontext = MemoryContextSwitchTo(aggcontext);
+	ctx = mvt_ctx_combine(ctx1, ctx2);
+	MemoryContextSwitchTo(oldcontext);
+	PG_RETURN_POINTER(ctx);
+#endif
+}
+

Modified: trunk/postgis/mvt.c
===================================================================
--- trunk/postgis/mvt.c	2017-11-09 16:27:26 UTC (rev 16104)
+++ trunk/postgis/mvt.c	2017-11-09 17:21:45 UTC (rev 16105)
@@ -103,7 +103,7 @@
 	return (value << 1) ^ (value >> 31);
 }
 
-static uint32_t encode_ptarray(struct mvt_agg_context *ctx, enum mvt_type type,
+static uint32_t encode_ptarray(mvt_agg_context *ctx, enum mvt_type type,
 			       POINTARRAY *pa, uint32_t *buffer,
 			       int32_t *px, int32_t *py)
 {
@@ -149,7 +149,7 @@
 	return offset;
 }
 
-static uint32_t encode_ptarray_initial(struct mvt_agg_context *ctx,
+static uint32_t encode_ptarray_initial(mvt_agg_context *ctx,
 				       enum mvt_type type,
 				       POINTARRAY *pa, uint32_t *buffer)
 {
@@ -157,7 +157,7 @@
 	return encode_ptarray(ctx, type, pa, buffer, &px, &py);
 }
 
-static void encode_point(struct mvt_agg_context *ctx, LWPOINT *point)
+static void encode_point(mvt_agg_context *ctx, LWPOINT *point)
 {
 	VectorTile__Tile__Feature *feature = ctx->feature;
 	feature->type = VECTOR_TILE__TILE__GEOM_TYPE__POINT;
@@ -167,7 +167,7 @@
 	encode_ptarray_initial(ctx, MVT_POINT, point->point, feature->geometry);
 }
 
-static void encode_mpoint(struct mvt_agg_context *ctx, LWMPOINT *mpoint)
+static void encode_mpoint(mvt_agg_context *ctx, LWMPOINT *mpoint)
 {
 	size_t c;
 	VectorTile__Tile__Feature *feature = ctx->feature;
@@ -181,7 +181,7 @@
 		lwline->points, feature->geometry);
 }
 
-static void encode_line(struct mvt_agg_context *ctx, LWLINE *lwline)
+static void encode_line(mvt_agg_context *ctx, LWLINE *lwline)
 {
 	size_t c;
 	VectorTile__Tile__Feature *feature = ctx->feature;
@@ -193,7 +193,7 @@
 		lwline->points, feature->geometry);
 }
 
-static void encode_mline(struct mvt_agg_context *ctx, LWMLINE *lwmline)
+static void encode_mline(mvt_agg_context *ctx, LWMLINE *lwmline)
 {
 	uint32_t i;
 	int32_t px = 0, py = 0;
@@ -211,7 +211,7 @@
 	feature->n_geometry = offset;
 }
 
-static void encode_poly(struct mvt_agg_context *ctx, LWPOLY *lwpoly)
+static void encode_poly(mvt_agg_context *ctx, LWPOLY *lwpoly)
 {
 	uint32_t i;
 	int32_t px = 0, py = 0;
@@ -229,7 +229,7 @@
 	feature->n_geometry = offset;
 }
 
-static void encode_mpoly(struct mvt_agg_context *ctx, LWMPOLY *lwmpoly)
+static void encode_mpoly(mvt_agg_context *ctx, LWMPOLY *lwmpoly)
 {
 	uint32_t i, j;
 	int32_t px = 0, py = 0;
@@ -250,7 +250,7 @@
 	feature->n_geometry = offset;
 }
 
-static void encode_geometry(struct mvt_agg_context *ctx, LWGEOM *lwgeom)
+static void encode_geometry(mvt_agg_context *ctx, LWGEOM *lwgeom)
 {
 	int type = lwgeom->type;
 
@@ -272,7 +272,7 @@
 	}
 }
 
-static TupleDesc get_tuple_desc(struct mvt_agg_context *ctx)
+static TupleDesc get_tuple_desc(mvt_agg_context *ctx)
 {
 	Oid tupType = HeapTupleHeaderGetTypeId(ctx->row);
 	int32 tupTypmod = HeapTupleHeaderGetTypMod(ctx->row);
@@ -280,7 +280,7 @@
 	return tupdesc;
 }
 
-static uint32_t get_key_index(struct mvt_agg_context *ctx, char *name)
+static uint32_t get_key_index(mvt_agg_context *ctx, char *name)
 {
 	struct mvt_kv_key *kv;
 	size_t size = strlen(name);
@@ -290,7 +290,7 @@
 	return kv->id;
 }
 
-static uint32_t add_key(struct mvt_agg_context *ctx, char *name)
+static uint32_t add_key(mvt_agg_context *ctx, char *name)
 {
 	struct mvt_kv_key *kv;
 	size_t size = strlen(name);
@@ -301,7 +301,7 @@
 	return kv->id;
 }
 
-static void parse_column_keys(struct mvt_agg_context *ctx)
+static void parse_column_keys(mvt_agg_context *ctx)
 {
 	TupleDesc tupdesc = get_tuple_desc(ctx);
 	int natts = tupdesc->natts;
@@ -343,7 +343,7 @@
 	ReleaseTupleDesc(tupdesc);
 }
 
-static void encode_keys(struct mvt_agg_context *ctx)
+static void encode_keys(mvt_agg_context *ctx)
 {
 	struct mvt_kv_key *kv;
 	size_t n_keys = ctx->keys_hash_i;
@@ -375,7 +375,7 @@
 	} \
 }
 
-static void encode_values(struct mvt_agg_context *ctx)
+static void encode_values(mvt_agg_context *ctx)
 {
 	POSTGIS_DEBUG(2, "encode_values called");
 	VectorTile__Tile__Value **values;
@@ -454,7 +454,7 @@
 	MVT_PARSE_INT_VALUE(value); \
 }
 
-static void add_value_as_string(struct mvt_agg_context *ctx,
+static void add_value_as_string(mvt_agg_context *ctx,
 	char *value, uint32_t *tags, uint32_t k)
 {
 	struct mvt_kv_string_value *kv;
@@ -475,7 +475,7 @@
 	tags[ctx->c*2+1] = kv->id;
 }
 
-static void parse_datum_as_string(struct mvt_agg_context *ctx, Oid typoid,
+static void parse_datum_as_string(mvt_agg_context *ctx, Oid typoid,
 	Datum datum, uint32_t *tags, uint32_t k)
 {
 	Oid foutoid;
@@ -489,7 +489,7 @@
 }
 
 #if POSTGIS_PGSQL_VERSION >= 94
-static uint32_t *parse_jsonb(struct mvt_agg_context *ctx, Jsonb *jb,
+static uint32_t *parse_jsonb(mvt_agg_context *ctx, Jsonb *jb,
 	uint32_t *tags)
 {
 	JsonbIterator *it;
@@ -553,7 +553,7 @@
 }
 #endif
 
-static void parse_values(struct mvt_agg_context *ctx)
+static void parse_values(mvt_agg_context *ctx)
 {
 	POSTGIS_DEBUG(2, "parse_values called");
 	uint32_t n_keys = ctx->keys_hash_i;
@@ -797,7 +797,7 @@
 /**
  * Initialize aggregation context.
  */
-void mvt_agg_init_context(struct mvt_agg_context *ctx)
+void mvt_agg_init_context(mvt_agg_context *ctx)
 {
 	VectorTile__Tile__Layer *layer;
 
@@ -806,6 +806,7 @@
 	if (ctx->extent == 0)
 		elog(ERROR, "mvt_agg_init_context: extent cannot be 0");
 
+	ctx->tile = NULL;
 	ctx->features_capacity = FEATURES_CAPACITY_INITIAL;
 	ctx->keys_hash = NULL;
 	ctx->string_values_hash = NULL;
@@ -836,7 +837,7 @@
  * Allocates a new feature, increment feature counter and
  * encode geometry and properties into it.
  */
-void mvt_agg_transfn(struct mvt_agg_context *ctx)
+void mvt_agg_transfn(mvt_agg_context *ctx)
 {
 	bool isnull = false;
 	Datum datum;
@@ -885,45 +886,259 @@
 	parse_values(ctx);
 }
 
-/**
- * Finalize aggregation.
- *
- * Encode keys and values and put the aggregated Layer message into
- * a Tile message and returns it packed as a bytea.
- */
-uint8_t *mvt_agg_finalfn(struct mvt_agg_context *ctx)
+static VectorTile__Tile * mvt_ctx_to_tile(mvt_agg_context *ctx)
 {
-	VectorTile__Tile__Layer *layers[1];
-	VectorTile__Tile tile = VECTOR_TILE__TILE__INIT;
-	size_t len;
-	uint8_t *buf;
+	encode_keys(ctx);
+	encode_values(ctx);
 
-	POSTGIS_DEBUG(2, "mvt_agg_finalfn called");
-	POSTGIS_DEBUGF(2, "mvt_agg_finalfn n_features == %zd", ctx->layer->n_features);
+	int n_layers = 1;
+	VectorTile__Tile *tile = palloc(sizeof(VectorTile__Tile));
+	vector_tile__tile__init(tile);
+	tile->layers = palloc(sizeof(VectorTile__Tile__Layer*) * n_layers);
+	tile->layers[0] = ctx->layer;
+	tile->n_layers = n_layers;
+	return tile;
+}
 
+static bytea *mvt_ctx_to_bytea(mvt_agg_context *ctx)
+{
+	/* Fill out the file slot, if it's not already filled. */
+	/* We should only have a filled slow when all the work of building */
+	/* out the data is complete, so after a serialize/deserialize cycle */
+	/* or after a context combine */
+
+	if (!ctx->tile)
+	{
+		ctx->tile = mvt_ctx_to_tile(ctx);
+	}
+
 	/* Zero features => empty bytea output */
-	if (ctx->layer->n_features == 0)
+	if (ctx && ctx->layer && ctx->layer->n_features == 0)
 	{
-		buf = palloc(VARHDRSZ);
-		SET_VARSIZE(buf, VARHDRSZ);
-		return buf;
+		bytea *ba = palloc(VARHDRSZ);
+		SET_VARSIZE(ba, VARHDRSZ);
+		return ba;
 	}
 
-	encode_keys(ctx);
-	encode_values(ctx);
+	/* Serialize the Tile */
+	size_t len = VARHDRSZ + vector_tile__tile__get_packed_size(ctx->tile);
+	bytea *ba = palloc(len);
+	vector_tile__tile__pack(ctx->tile, (uint8_t*)VARDATA(ba));
+	SET_VARSIZE(ba, len);
+	return ba;
+}
 
-	layers[0] = ctx->layer;
 
-	tile.n_layers = 1;
-	tile.layers = layers;
+bytea * mvt_ctx_serialize(mvt_agg_context *ctx)
+{
+	return mvt_ctx_to_bytea(ctx);
+}
 
-	len = vector_tile__tile__get_packed_size(&tile);
-	buf = palloc(sizeof(*buf) * (len + VARHDRSZ));
-	vector_tile__tile__pack(&tile, buf + VARHDRSZ);
+static void * mvt_allocator(void *data, size_t size)
+{
+	return palloc(size);
+}
 
-	SET_VARSIZE(buf, VARHDRSZ + len);
+static void mvt_deallocator(void *data, void *ptr)
+{
+	return pfree(ptr);
+}
 
-	return buf;
+mvt_agg_context * mvt_ctx_deserialize(const bytea *ba)
+{
+	ProtobufCAllocator allocator = {
+		mvt_allocator,
+		mvt_deallocator,
+		NULL
+	};
+
+	size_t len = VARSIZE(ba) - VARHDRSZ;
+	VectorTile__Tile *tile = vector_tile__tile__unpack(&allocator, len, (uint8_t*)VARDATA(ba));
+	mvt_agg_context *ctx = palloc(sizeof(mvt_agg_context));
+	memset(ctx, 0, sizeof(mvt_agg_context));
+	ctx->tile = tile;
+	return ctx;
 }
 
+static VectorTile__Tile__Value *
+tile_value_copy(const VectorTile__Tile__Value *value)
+{
+	VectorTile__Tile__Value *nvalue = palloc(sizeof(VectorTile__Tile__Value));
+	memcpy(nvalue, value, sizeof(VectorTile__Tile__Value));
+	if (value->string_value)
+		nvalue->string_value = pstrdup(value->string_value);
+	return nvalue;
+}
+
+static VectorTile__Tile__Feature *
+tile_feature_copy(const VectorTile__Tile__Feature *feature, int key_offset, int value_offset)
+{
+	int i;
+
+	/* Null in => Null out */
+	if (!feature) return NULL;
+
+	/* Init object */
+	VectorTile__Tile__Feature *nfeature = palloc(sizeof(VectorTile__Tile__Feature));
+	vector_tile__tile__feature__init(nfeature);
+
+	/* Copy settings straight over */
+	nfeature->has_id = feature->has_id;
+	nfeature->id = feature->id;
+	nfeature->has_type = feature->has_type;
+	nfeature->type = feature->type;
+
+	/* Copy tags over, offsetting indexes so they match the dictionaries */
+	/* at the Tile_Layer level */
+	if (feature->n_tags > 0)
+	{
+		nfeature->n_tags = feature->n_tags;
+		nfeature->tags = palloc(sizeof(uint32_t)*feature->n_tags);
+		for (i = 0; i < feature->n_tags/2; i++)
+		{
+			nfeature->tags[2*i] = feature->tags[2*i] + key_offset;
+			nfeature->tags[2*i+1] = feature->tags[2*i+1] + value_offset;
+		}
+	}
+
+	/* Copy the raw geometry data over literally */
+	if (feature->n_geometry > 0)
+	{
+		nfeature->n_geometry = feature->n_geometry;
+		nfeature->geometry = palloc(sizeof(uint32_t)*feature->n_geometry);
+		memcpy(nfeature->geometry, feature->geometry, sizeof(uint32_t)*feature->n_geometry);
+	}
+
+	/* Done */
+	return nfeature;
+}
+
+static VectorTile__Tile__Layer *
+vectortile_layer_combine(const VectorTile__Tile__Layer *layer1, const VectorTile__Tile__Layer *layer2)
+{
+	int i, j;
+	int key2_offset, value2_offset;
+	VectorTile__Tile__Layer *layer = palloc(sizeof(VectorTile__Tile__Layer));
+	vector_tile__tile__layer__init(layer);
+
+	/* Take globals from layer1 */
+	layer->version = layer1->version;
+	layer->name = pstrdup(layer1->name);
+    layer->has_extent = layer1->has_extent;
+	layer->extent = layer1->extent;
+
+	/* Copy keys into new layer */
+	j = 0;
+	layer->n_keys = layer1->n_keys + layer2->n_keys;
+	layer->keys = layer->n_keys ? palloc(layer->n_keys * sizeof(void*)) : NULL;
+	for (i = 0; i < layer1->n_keys; i++)
+		layer->keys[j++] = pstrdup(layer1->keys[i]);
+	key2_offset = j;
+	for (i = 0; i < layer2->n_keys; i++)
+		layer->keys[j++] = pstrdup(layer2->keys[i]);
+
+	/* Copy values into new layer */
+	/* TODO, apply hash logic here too, so that merged tiles */
+	/* retain unique value maps */
+	layer->n_values = layer1->n_values + layer2->n_values;
+	layer->values = layer->n_values ? palloc(layer->n_values * sizeof(void*)) : NULL;
+	j = 0;
+	for (i = 0; i < layer1->n_values; i++)
+		layer->values[j++] = tile_value_copy(layer1->values[i]);
+	value2_offset = j;
+	for (i = 0; i < layer2->n_values; i++)
+		layer->values[j++] = tile_value_copy(layer2->values[i]);
+
+
+	layer->n_features = layer1->n_features + layer2->n_features;
+	layer->features = layer->n_features ? palloc(layer->n_features * sizeof(void*)) : NULL;
+	j = 0;
+	for (i = 0; i < layer1->n_features; i++)
+		layer->features[j++] = tile_feature_copy(layer1->features[i], 0, 0);
+	value2_offset = j;
+	for (i = 0; i < layer2->n_features; i++)
+		layer->features[j++] = tile_feature_copy(layer2->features[i], key2_offset, value2_offset);
+
+	return layer;
+}
+
+
+static VectorTile__Tile *
+vectortile_tile_combine(VectorTile__Tile *tile1, VectorTile__Tile *tile2)
+{
+	int i, j;
+
+	/* Hopelessly messing up memory ownership here */
+	if (tile1->n_layers == 0 && tile2->n_layers == 0)
+		return tile1;
+	else if (tile1->n_layers == 0)
+		return tile2;
+	else if (tile2->n_layers == 0)
+		return tile1;
+
+	VectorTile__Tile *tile = palloc(sizeof(VectorTile__Tile));
+	vector_tile__tile__init(tile);
+	tile->layers = palloc(sizeof(void*));
+	tile->n_layers = 0;
+
+	/* Merge all matching layers in the files (basically always only one) */
+	for (i = 0; i < tile1->n_layers; i++)
+	{
+		for (j = 0; j < tile2->n_layers; j++)
+		{
+			VectorTile__Tile__Layer *l1 = tile1->layers[i];
+			VectorTile__Tile__Layer *l2 = tile2->layers[j];
+			if (strcmp(l1->name, l2->name)==0)
+			{
+				VectorTile__Tile__Layer *layer = vectortile_layer_combine(l1, l2);
+				if (!layer)
+					continue;
+				tile->layers[tile->n_layers++] = layer;
+				/* Add a spare slot at the end of the array */
+				tile->layers = repalloc(tile->layers, (tile->n_layers+1) * sizeof(void*));
+			}
+		}
+	}
+	return tile;
+}
+
+mvt_agg_context * mvt_ctx_combine(mvt_agg_context *ctx1, mvt_agg_context *ctx2)
+{
+	if (ctx1 || ctx2)
+	{
+		if (ctx1 && ! ctx2) return ctx1;
+		if (ctx2 && ! ctx1) return ctx2;
+		if (ctx1 && ctx2 && ctx1->tile && ctx2->tile)
+		{
+			mvt_agg_context *ctxnew = palloc(sizeof(mvt_agg_context));
+			memset(ctxnew, 0, sizeof(mvt_agg_context));
+			ctxnew->tile = vectortile_tile_combine(ctx1->tile, ctx2->tile);
+			return ctxnew;
+		}
+		else
+		{
+			elog(DEBUG2, "ctx1->tile = %p", ctx1->tile);
+			elog(DEBUG2, "ctx2->tile = %p", ctx2->tile);
+			elog(ERROR, "%s: unable to combine contexts where tile attribute is null", __func__);
+			return NULL;
+		}
+	}
+	else
+	{
+		return NULL;
+	}
+}
+
+/**
+ * Finalize aggregation.
+ *
+ * Encode keys and values and put the aggregated Layer message into
+ * a Tile message and returns it packed as a bytea.
+ */
+bytea *mvt_agg_finalfn(mvt_agg_context *ctx)
+{
+	return mvt_ctx_to_bytea(ctx);
+}
+
+
 #endif

Modified: trunk/postgis/mvt.h
===================================================================
--- trunk/postgis/mvt.h	2017-11-09 16:27:26 UTC (rev 16104)
+++ trunk/postgis/mvt.h	2017-11-09 17:21:45 UTC (rev 16105)
@@ -46,7 +46,7 @@
 
 #include "vector_tile.pb-c.h"
 
-struct mvt_agg_context {
+typedef struct mvt_agg_context {
 	char *name;
 	uint32_t extent;
 	char *geom_name;
@@ -54,6 +54,7 @@
 	HeapTupleHeader row;
 	VectorTile__Tile__Feature *feature;
 	VectorTile__Tile__Layer *layer;
+	VectorTile__Tile *tile;
 	size_t features_capacity;
 	struct mvt_kv_key *keys_hash;
 	struct mvt_kv_string_value *string_values_hash;
@@ -65,14 +66,18 @@
 	uint32_t values_hash_i;
 	uint32_t keys_hash_i;
 	uint32_t c;
-};
+} mvt_agg_context;
 
-LWGEOM *mvt_geom(LWGEOM *geom, const GBOX *bounds, uint32_t extent, uint32_t buffer,
-	bool clip_geom);
-void mvt_agg_init_context(struct mvt_agg_context *ctx);
-void mvt_agg_transfn(struct mvt_agg_context *ctx);
-uint8_t *mvt_agg_finalfn(struct mvt_agg_context *ctx);
+/* Prototypes */
+LWGEOM *mvt_geom(LWGEOM *geom, const GBOX *bounds, uint32_t extent, uint32_t buffer, bool clip_geom);
+void mvt_agg_init_context(mvt_agg_context *ctx);
+void mvt_agg_transfn(mvt_agg_context *ctx);
+bytea *mvt_agg_finalfn(mvt_agg_context *ctx);
+bytea *mvt_ctx_serialize(mvt_agg_context *ctx);
+mvt_agg_context * mvt_ctx_deserialize(const bytea *ba);
+mvt_agg_context * mvt_ctx_combine(mvt_agg_context *ctx1, mvt_agg_context *ctx2);
 
+
 #endif  /* HAVE_LIBPROTOBUF */
 
 #endif

Modified: trunk/postgis/postgis.sql.in
===================================================================
--- trunk/postgis/postgis.sql.in	2017-11-09 16:27:26 UTC (rev 16104)
+++ trunk/postgis/postgis.sql.in	2017-11-09 17:21:45 UTC (rev 16105)
@@ -4476,46 +4476,80 @@
 	AS 'MODULE_PATHNAME', 'pgis_asmvt_finalfn'
 	LANGUAGE 'c' IMMUTABLE _PARALLEL;
 
+-- Availability: 2.5.0
+CREATE OR REPLACE FUNCTION pgis_asmvt_combinefn(internal, internal)
+	RETURNS internal
+	AS 'MODULE_PATHNAME', 'pgis_asmvt_combinefn'
+	LANGUAGE 'c' IMMUTABLE _PARALLEL;
+
+-- Availability: 2.5.0
+CREATE OR REPLACE FUNCTION pgis_asmvt_serialfn(internal)
+	RETURNS bytea
+	AS 'MODULE_PATHNAME', 'pgis_asmvt_serialfn'
+	LANGUAGE 'c' IMMUTABLE _PARALLEL;
+
+-- Availability: 2.5.0
+CREATE OR REPLACE FUNCTION pgis_asmvt_deserialfn(bytea, internal)
+	RETURNS internal
+	AS 'MODULE_PATHNAME', 'pgis_asmvt_deserialfn'
+	LANGUAGE 'c' IMMUTABLE _PARALLEL;
+
 -- Availability: 2.4.0
+-- Changed: 2.5.0
 CREATE AGGREGATE ST_AsMVT(anyelement)
 (
 	sfunc = pgis_asmvt_transfn,
 	stype = internal,
 #if POSTGIS_PGSQL_VERSION >= 96
 	parallel = safe,
+	serialfunc = pgis_asmvt_serialfn,
+	deserialfunc = pgis_asmvt_deserialfn,
+	combinefunc = pgis_asmvt_combinefn,
 #endif
 	finalfunc = pgis_asmvt_finalfn
 );
 
 -- Availability: 2.4.0
+-- Changed: 2.5.0
 CREATE AGGREGATE ST_AsMVT(anyelement, text)
 (
 	sfunc = pgis_asmvt_transfn,
 	stype = internal,
 #if POSTGIS_PGSQL_VERSION >= 96
 	parallel = safe,
+	serialfunc = pgis_asmvt_serialfn,
+	deserialfunc = pgis_asmvt_deserialfn,
+	combinefunc = pgis_asmvt_combinefn,
 #endif
 	finalfunc = pgis_asmvt_finalfn
 );
 
 -- Availability: 2.4.0
+-- Changed: 2.5.0
 CREATE AGGREGATE ST_AsMVT(anyelement, text, int4)
 (
 	sfunc = pgis_asmvt_transfn,
 	stype = internal,
 #if POSTGIS_PGSQL_VERSION >= 96
 	parallel = safe,
+	serialfunc = pgis_asmvt_serialfn,
+	deserialfunc = pgis_asmvt_deserialfn,
+	combinefunc = pgis_asmvt_combinefn,
 #endif
 	finalfunc = pgis_asmvt_finalfn
 );
 
 -- Availability: 2.4.0
+-- Changed: 2.5.0
 CREATE AGGREGATE ST_AsMVT(anyelement, text, int4, text)
 (
 	sfunc = pgis_asmvt_transfn,
 	stype = internal,
 #if POSTGIS_PGSQL_VERSION >= 96
 	parallel = safe,
+	serialfunc = pgis_asmvt_serialfn,
+	deserialfunc = pgis_asmvt_deserialfn,
+	combinefunc = pgis_asmvt_combinefn,
 #endif
 	finalfunc = pgis_asmvt_finalfn
 );



More information about the postgis-tickets mailing list