[GRASS-SVN] r32509 - in grass/trunk: include include/Make include/iostream lib lib/iostream raster/r.terraflow

svn_grass at osgeo.org svn_grass at osgeo.org
Mon Aug 4 08:21:12 EDT 2008


Author: pkelly
Date: 2008-08-04 08:21:11 -0400 (Mon, 04 Aug 2008)
New Revision: 32509

Added:
   grass/trunk/include/iostream/
   grass/trunk/include/iostream/ami.h
   grass/trunk/include/iostream/ami_config.h
   grass/trunk/include/iostream/ami_sort.h
   grass/trunk/include/iostream/ami_sort_impl.h
   grass/trunk/include/iostream/ami_stream.h
   grass/trunk/include/iostream/embuffer.h
   grass/trunk/include/iostream/empq.h
   grass/trunk/include/iostream/empq_adaptive.h
   grass/trunk/include/iostream/empq_adaptive_impl.h
   grass/trunk/include/iostream/empq_impl.h
   grass/trunk/include/iostream/imbuffer.h
   grass/trunk/include/iostream/mem_stream.h
   grass/trunk/include/iostream/minmaxheap.h
   grass/trunk/include/iostream/mm.h
   grass/trunk/include/iostream/mm_utils.h
   grass/trunk/include/iostream/pqheap.h
   grass/trunk/include/iostream/queue.h
   grass/trunk/include/iostream/quicksort.h
   grass/trunk/include/iostream/replacementHeap.h
   grass/trunk/include/iostream/replacementHeapBlock.h
   grass/trunk/include/iostream/rtimer.h
   grass/trunk/lib/iostream/
   grass/trunk/lib/iostream/Makefile
   grass/trunk/lib/iostream/ami_stream.cc
   grass/trunk/lib/iostream/minmaxheap_test.cc
   grass/trunk/lib/iostream/mm.cc
   grass/trunk/lib/iostream/mm_utils.cc
   grass/trunk/lib/iostream/rtimer.cc
Removed:
   grass/trunk/raster/r.terraflow/IOStream/
Modified:
   grass/trunk/include/Make/Grass.make.in
   grass/trunk/lib/Makefile
   grass/trunk/raster/r.terraflow/3scan.h
   grass/trunk/raster/r.terraflow/Makefile
   grass/trunk/raster/r.terraflow/ccforest.h
   grass/trunk/raster/r.terraflow/common.h
   grass/trunk/raster/r.terraflow/direction.h
   grass/trunk/raster/r.terraflow/fill.h
   grass/trunk/raster/r.terraflow/filldepr.cc
   grass/trunk/raster/r.terraflow/filldepr.h
   grass/trunk/raster/r.terraflow/genericWindow.h
   grass/trunk/raster/r.terraflow/grass2str.h
   grass/trunk/raster/r.terraflow/grid.h
   grass/trunk/raster/r.terraflow/nodata.cc
   grass/trunk/raster/r.terraflow/nodata.h
   grass/trunk/raster/r.terraflow/plateau.cc
   grass/trunk/raster/r.terraflow/plateau.h
   grass/trunk/raster/r.terraflow/sortutils.h
   grass/trunk/raster/r.terraflow/stats.h
   grass/trunk/raster/r.terraflow/streamutils.h
   grass/trunk/raster/r.terraflow/sweep.cc
   grass/trunk/raster/r.terraflow/sweep.h
   grass/trunk/raster/r.terraflow/water.cc
Log:
Move iostream library to lib/iostream and includes to include/iostream,
and update r.terraflow to use it - in preparation for introducing
r.viewshed.


Modified: grass/trunk/include/Make/Grass.make.in
===================================================================
--- grass/trunk/include/Make/Grass.make.in	2008-08-04 09:36:34 UTC (rev 32508)
+++ grass/trunk/include/Make/Grass.make.in	2008-08-04 12:21:11 UTC (rev 32509)
@@ -112,6 +112,7 @@
 ICON_LIBNAME          = grass_icon
 IMAGERY_LIBNAME       = grass_imagery
 IORTHO_LIBNAME        = grass_Iortho
+IOSTREAM_LIBNAME      = grass_iostream
 ISMAP_LIBNAME         = grass_ismap
 LINKM_LIBNAME         = grass_linkm
 LOCK_LIBNAME          = grass_lock
@@ -209,6 +210,7 @@
 ICONLIB       = -l$(ICON_LIBNAME)
 IMAGERYLIB    = -l$(IMAGERY_LIBNAME) $(GISLIB) 
 IORTHOLIB     = -l$(IORTHO_LIBNAME) $(IMAGERYLIB) $(GISLIB) 
+IOSTREAMLIB   = -l$(IOSTREAM_LIBNAME)
 ISMAPLIB      = -l$(ISMAP_LIBNAME)
 LINKMLIB      = -l$(LINKM_LIBNAME)
 LOCKLIB       = -l$(LOCK_LIBNAME)
@@ -307,6 +309,7 @@
 ICONDEP     = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(ICON_LIBNAME)$(LIB_SUFFIX)
 IMAGERYDEP  = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(IMAGERY_LIBNAME)$(LIB_SUFFIX)
 IORTHODEP   = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(IORTHO_LIBNAME)$(LIB_SUFFIX)
+IOSTREAMDEP = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(IOSTREAM_LIBNAME)$(LIB_SUFFIX)
 LINKMDEP    = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(LINKM_LIBNAME)$(LIB_SUFFIX)
 LOCKDEP     = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(LOCK_LIBNAME)$(LIB_SUFFIX)
 RASTERDEP   = $(ARCH_LIBDIR)/$(LIB_PREFIX)$(RASTER_LIBNAME)$(LIB_SUFFIX)

Copied: grass/trunk/include/iostream/ami.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/ami.h)
===================================================================
--- grass/trunk/include/iostream/ami.h	                        (rev 0)
+++ grass/trunk/include/iostream/ami.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,46 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+
+#ifndef _AMI_H
+#define _AMI_H
+
+//debug flags
+#include "ami_config.h"
+
+//typedefs, stream
+#include "ami_stream.h"
+
+//memory manager
+#include "mm.h"
+#include "mm_utils.h"
+
+#include "ami_sort.h"
+
+//data structures
+#include "queue.h"
+#include "pqheap.h"
+//#include "empq.h"
+#include "empq_impl.h"
+//#include "empq_adaptive.h"
+#include "empq_adaptive_impl.h"
+
+//timer
+#include "rtimer.h"
+
+#endif // _AMI_H 

Copied: grass/trunk/include/iostream/ami_config.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/ami_config.h)
===================================================================
--- grass/trunk/include/iostream/ami_config.h	                        (rev 0)
+++ grass/trunk/include/iostream/ami_config.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,43 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+#ifndef _ami_config_h
+#define _ami_config_h
+
+
+
+//CHOOSE PQUEUE IMPLEMENTATION 
+//------------------------------------------------------------
+//#define IM_PQUEUE
+//#define EM_PQUEUE
+#define EMPQ_ADAPTIVE
+
+
+//maximize memory usage by keeping streams on disk
+//------------------------------------------------------------
+#if (defined EM_PQUEUE || defined EMPQ_ADAPTIVE)
+//enables keeping streams on disk, rather than in memory;
+#define SAVE_MEMORY
+#endif
+
+
+#if (defined EMPQ_ADAPTIVE && !defined SAVE_MEMORY)
+#error  EMPQ_ADAPTIVE requires SAVE_MEMORY set
+#endif
+
+#endif

Copied: grass/trunk/include/iostream/ami_sort.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/ami_sort.h)
===================================================================
--- grass/trunk/include/iostream/ami_sort.h	                        (rev 0)
+++ grass/trunk/include/iostream/ami_sort.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,165 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+
+#ifndef _AMI_SORT_H
+#define _AMI_SORT_H
+
+#include "ami_sort_impl.h"
+
+#define SORT_DEBUG if(0)
+
+
+/* ---------------------------------------------------------------------- */
+
+// A version of AMI_sort that takes an input streamof elements of type
+// T, creates an output stream and and uses the < operator to sort
+
+// instream is allocated;  *outstream is created
+// template<class T>
+// AMI_err 
+// AMI_sort(AMI_STREAM<T> *instream, AMI_STREAM<T> **outstream) {
+
+//   cout << "Not implemented yet!\n";
+//   exit(1);
+//   return AMI_ERROR_NO_ERROR;
+// }
+
+
+
+/* ---------------------------------------------------------------------- */
+
+// A version of AMI_sort that takes an input stream of elements of
+// type T, creates an output stream, and a user-specified comparison
+// function
+
+// instream is allocated;  *outstream is created
+// template<class T>
+// AMI_err AMI_sort(AMI_STREAM<T> *instream, AMI_STREAM<T> **outstream,
+//                  int (*cmp)(const T&, const  T&)) {
+
+//   cout << "Not implemented yet!\n";
+//   exit(1);
+//   return AMI_ERROR_NO_ERROR;
+// }
+
+
+
+/* ---------------------------------------------------------------------- */
+// A version of AMI_sort that takes an input stream of elements of
+// type T, creates an output stream, and a user-specified comparison
+// object. 
+
+//The comparison object "cmp", of (user-defined) class represented by
+//CMPR, must have a member function called "compare" which is used for
+//sorting the input stream.
+
+
+
+
+//  create  *outstream 
+template<class T, class Compare>
+AMI_err 
+AMI_sort(AMI_STREAM<T> *instream, AMI_STREAM<T> **outstream, Compare *cmp, 
+	 int deleteInputStream = 0) {
+  char* name;
+  queue<char*>* runList;
+  int instreamLength;
+
+  assert(instream && outstream && cmp); 
+  instreamLength = instream->stream_len();
+
+  if (instreamLength == 0) {
+    *outstream = new AMI_STREAM<T>();
+    if (deleteInputStream) {
+      delete instream;
+    }
+    return AMI_ERROR_NO_ERROR;
+  }
+  
+  SORT_DEBUG {
+    instream->name(&name);
+    cout << "AMI_sort: sorting stream" << name <<", len=" 
+	 << instreamLength << endl;
+    delete name;
+    MM_manager.print();
+  }
+  
+  //run formation
+  runList = runFormation(instream, cmp);
+  assert(runList && runList->length() > 0); 
+
+  if (deleteInputStream) {
+    delete instream;
+  }
+
+  if (runList->length() == 1) {
+    //if 1 run only
+    runList->dequeue(&name);
+    *outstream = new AMI_STREAM<T>(name);
+    delete name; //should be safe, stream makes its own copy
+  } else {
+    *outstream = multiMerge<T,Compare>(runList,  cmp);
+    //i thought the templates are not needed in the call, but seems to
+    //help the compiler..laura
+  }
+
+  assert(runList->length() == 0);
+  delete runList;
+  
+  SORT_DEBUG {
+    cout << "AMI_sort: done" << endl << endl;
+    MM_manager.print();
+  }
+
+  assert(*outstream);  
+  assert((*outstream)->stream_len() == instreamLength);
+  return AMI_ERROR_NO_ERROR;
+  
+}
+
+
+
+template<class  T, class Compare>
+int
+isSorted(AMI_STREAM<T> *str, Compare cmp) {
+  T *prev, *crt;
+  AMI_err ae;   
+
+  assert(str);
+  str->seek(0);
+  
+  if (str->stream_len() <2) return 1;
+  
+  ae = str->read_item(&crt);
+  cout << "reading: " << *crt << endl;
+  prev = new T (*crt);
+  ae = str->read_item(&crt);
+  while (ae == AMI_ERROR_NO_ERROR) {
+    cout << "reading: " << *crt << endl;
+    if (cmp.compare(*prev, *crt) != -1)
+      assert(0);
+      return 0;
+    prev = crt;
+    ae = str->read_item(&crt);
+  }
+  return 1;
+}
+             
+                          
+#endif // _AMI_SORT_H 

Copied: grass/trunk/include/iostream/ami_sort_impl.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/ami_sort_impl.h)
===================================================================
--- grass/trunk/include/iostream/ami_sort_impl.h	                        (rev 0)
+++ grass/trunk/include/iostream/ami_sort_impl.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,362 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+
+#ifndef AMI_SORT_IMPL_H
+#define AMI_SORT_IMPL_H
+
+#include "ami_stream.h"
+#include "mem_stream.h"
+#include "mm.h"
+#include "quicksort.h"
+#include "queue.h"
+#include "replacementHeap.h"
+#include "replacementHeapBlock.h"
+
+#define SDEBUG if(0)
+
+
+/* if this flag is defined, a run will be split into blocks, each
+   block sorted and then all blocks merged */
+#define BLOCKED_RUN 
+
+
+
+/* ---------------------------------------------------------------------- */
+//set run_size, last_run_size and nb_runs depending on how much memory
+//is available
+template<class T>
+static void 
+initializeRunFormation(AMI_STREAM<T> *instream,
+		       size_t &run_size, size_t &last_run_size, 
+		       unsigned int &nb_runs) {
+  
+  off_t strlen = instream->stream_len();  
+  size_t mm_avail = MM_manager.memory_available();
+#ifdef BLOCKED_RUN
+  // not in place, can only use half memory 
+  mm_avail = mm_avail/2;
+#endif
+  
+  run_size = mm_avail/sizeof(T);
+
+  if (strlen == 0) {
+    nb_runs = last_run_size = 0;
+  } else {
+    if (strlen % run_size == 0) {
+      nb_runs = strlen/run_size;
+      last_run_size = run_size;
+    } else {
+      nb_runs = strlen/run_size + 1;
+      last_run_size = strlen % run_size;
+    }
+  }
+  SDEBUG cout << "nb_runs=" << nb_runs 
+	      << ", run_size=" << run_size 
+	      << ", last_run_size=" << last_run_size
+	      << "\n"; 
+}
+
+
+
+/* ---------------------------------------------------------------------- */
+/* data is allocated; read run_size elements from stream into data and
+   sort them using quicksort */
+template<class T, class Compare>
+void makeRun_Block(AMI_STREAM<T> *instream, T* data, 
+		   unsigned int run_size, Compare *cmp) {
+  AMI_err err;
+  
+  //read next run from input stream
+  err = instream->read_array(data, run_size); 
+  assert(err == AMI_ERROR_NO_ERROR);
+  
+  //sort it in memory in place
+  quicksort(data, run_size, *cmp);
+  
+}
+
+
+/* ---------------------------------------------------------------------- */
+/* data is allocated; read run_size elements from stream into data and
+   sort them using quicksort; instead of reading the whole chunk at
+   once, it reads it in blocks, sorts each block and then merges the
+   blocks together. Note: it is not in place! it allocates another
+   array of same size as data, writes the sorted run into it and
+   deteles data, and replaces data with outdata */
+template<class T, class Compare>
+void makeRun(AMI_STREAM<T> *instream, T* &data, 
+	     int run_size, Compare *cmp) {
+  
+  unsigned int nblocks, last_block_size, crt_block_size, block_size; 
+
+  block_size = STREAM_BUFFER_SIZE;
+
+  if (run_size % block_size == 0) {
+    nblocks = run_size / block_size;
+    last_block_size = block_size;
+  } else { 
+    nblocks = run_size / block_size + 1;
+    last_block_size = run_size % block_size;
+  }
+  
+  //create queue of blocks waiting to be merged
+  queue<MEM_STREAM<T> *> *blockList;
+  MEM_STREAM<T>* str;
+  blockList  = new  queue<MEM_STREAM<T> *>(nblocks);
+  for (unsigned int i=0; i < nblocks; i++) {
+    crt_block_size = (i == nblocks-1) ? last_block_size: block_size;
+    makeRun_Block(instream, &(data[i*block_size]), crt_block_size, cmp);
+    str = new MEM_STREAM<T>( &(data[i*block_size]), crt_block_size);
+    blockList->enqueue(str);
+  }
+  assert(blockList->length() == nblocks);
+  
+  //now data consists of sorted blocks: merge them 
+  ReplacementHeapBlock<T,Compare> rheap(blockList);
+  SDEBUG rheap.print(cerr);
+  int i = 0;
+  T elt;  
+  T* outdata = new T [run_size];
+  while (!rheap.empty()) {
+    elt = rheap.extract_min();
+    outdata[i] = elt; 
+    //SDEBUG cerr << "makeRun: written " << elt << endl;
+    i++;
+  }
+  assert( i == run_size &&  blockList->length() == 0);
+  delete blockList;
+ 
+  T* tmp = data; 
+  delete [] tmp;
+  data = outdata;
+}
+
+
+
+/* ---------------------------------------------------------------------- */
+
+//partition instream in streams that fit in main memory, sort each
+//stream, remember its name, make it persistent and store it on
+//disk. if entire stream fits in memory, sort it and store it and
+//return it.
+
+//assume instream is allocated prior to the call.
+// set nb_runs and allocate runNames.
+
+//The comparison object "cmp", of (user-defined) class represented by
+//Compare, must have a member function called "compare" which is used
+//for sorting the input stream.  
+
+template<class T, class Compare>
+queue<char*>*
+runFormation(AMI_STREAM<T> *instream, Compare *cmp) {
+ 
+  size_t run_size,last_run_size, crt_run_size;
+  unsigned int nb_runs;
+  queue<char*>* runList;
+  T* data;
+  AMI_STREAM<T>* str;
+  char* strname;
+  
+  assert(instream && cmp);
+  SDEBUG cout << "runFormation: ";
+  SDEBUG MM_manager.print();
+  
+  //rewind file
+  instream->seek(0); //should check error xxx
+  
+  //estimate run_size, last_run_size and nb_runs
+  initializeRunFormation(instream, run_size, last_run_size, nb_runs);
+  
+  //create runList
+  runList = new queue<char*>(nb_runs);
+  
+  //allocate space for a run
+  if (nb_runs <= 1) {
+    //don't waste space if input stream is smaller than run_size
+    data = new T[last_run_size];
+  } else {
+    data = new T[run_size];
+  }
+  SDEBUG MM_manager.print();
+  
+  for (size_t i=0; i< nb_runs; i++) {
+    
+    crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
+    
+    //SDEBUG cout << "i=" << i << ":  runsize=" << crt_run_size << ", ";  
+
+#ifdef BLOCKED_RUN
+    makeRun(instream, data, crt_run_size, cmp);
+#else        
+    makeRun_Block(instream, data, crt_run_size, cmp);
+#endif
+    SDEBUG MM_manager.print();
+
+    //read next run from input stream
+    //err = instream->read_array(data, crt_run_size); 
+    //assert(err == AMI_ERROR_NO_ERROR);
+    //sort it in memory in place
+    //quicksort(data, crt_run_size, *cmp);
+    
+    //create a new stream to hold this run 
+    str = new AMI_STREAM<T>();
+    str->write_array(data, crt_run_size);
+    assert(str->stream_len() == crt_run_size);
+    
+    //remember this run's name
+    str->name(&strname);
+    runList->enqueue(strname);
+    
+    //delete the stream -- should not keep too many streams open
+    str->persist(PERSIST_PERSISTENT);
+    delete str;
+  };
+  SDEBUG MM_manager.print();
+  //release the run memory!
+  delete [] data;
+  
+  SDEBUG cout << "runFormation: done.\n";
+  SDEBUG MM_manager.print();
+
+  return runList;
+};
+
+
+
+
+
+
+/* ---------------------------------------------------------------------- */
+
+//this is one pass of merge; estimate max possible merge arity <ar>
+//and merge the first <ar> streams from runList ; create and return
+//the resulting stream (does not add it to the queue -- the calling
+//function will do that)
+
+//input streams are assumed to be sorted, and are not necessarily of
+//the same length.
+
+//streamList does not contains streams, but names of streams, which
+//must be opened in order to be merged
+
+//The comparison object "cmp", of (user-defined) class represented by
+//Compare, must have a member function called "compare" which is used
+//for sorting the input stream.  
+
+
+template<class T, class Compare>
+AMI_STREAM<T>* 
+singleMerge(queue<char*>* streamList, Compare *cmp) {
+  AMI_STREAM<T>* mergedStr;
+  size_t mm_avail, blocksize;
+  unsigned int arity, max_arity; 
+  T elt;
+
+  assert(streamList && cmp);
+
+  SDEBUG cout << "singleMerge: ";
+
+  //estimate max possible merge arity with available memory (approx M/B)
+  mm_avail = MM_manager.memory_available();
+  blocksize = getpagesize();
+  //should use AMI function, but there's no stream at this point
+  // AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_BUFFER);
+  max_arity = mm_avail/blocksize;
+  arity = (streamList->length() < max_arity) ? 
+    streamList->length() :  max_arity;
+
+  SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n"; 
+  
+  //create output stream
+  mergedStr = new AMI_STREAM<T>();
+  
+  ReplacementHeap<T,Compare> rheap(arity, streamList);
+  SDEBUG rheap.print(cerr);
+
+  int i = 0;
+  while (!rheap.empty()) {
+    //mergedStr->write_item( rheap.extract_min() );
+    //xxx should check error here
+    elt = rheap.extract_min();
+    mergedStr->write_item(elt);
+    //SDEBUG cerr << "smerge: written " << elt << endl;
+    i++;
+  }
+  
+  SDEBUG cout << "..done\n";
+
+  return mergedStr;
+}
+
+
+
+
+/* ---------------------------------------------------------------------- */
+
+//merge runs whose names are given by runList; this may entail
+//multiple passes of singleMerge();
+
+//return the resulting output stream
+
+//input streams are assumed to be sorted, and are not necessarily of
+//the same length.
+
+//The comparison object "cmp", of (user-defined) class represented by
+//Compare, must have a member function called "compare" which is used
+//for sorting the input stream.  
+
+
+template<class T, class Compare>
+AMI_STREAM<T>* 
+multiMerge(queue<char*>* runList, Compare *cmp) {
+  AMI_STREAM<T> * mergedStr= NULL;
+  char* path;
+  
+  assert(runList && runList->length() > 1  && cmp);
+
+  SDEBUG cout << "multiMerge: " << runList->length() << " runs"  << endl;
+  
+  while (runList->length() > 1) {
+    
+    //merge streams from streamlist into mergedStr
+    mergedStr = singleMerge<T,Compare>(runList, cmp);
+    //i thought the templates are not needed in the call, but seems to
+    //help the compiler..laura
+    assert(mergedStr);
+    
+    //if more runs exist, delete this stream and add it to list
+    if (runList->length() > 0) {
+      mergedStr->name(&path);    
+      runList->enqueue(path);
+      mergedStr->persist(PERSIST_PERSISTENT);
+      delete mergedStr;
+    }
+  }
+
+  assert(runList->length() == 0);
+  assert(mergedStr);
+  return mergedStr;
+}
+
+
+
+
+#endif
+  

Copied: grass/trunk/include/iostream/ami_stream.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/ami_stream.h)
===================================================================
--- grass/trunk/include/iostream/ami_stream.h	                        (rev 0)
+++ grass/trunk/include/iostream/ami_stream.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,554 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+
+#ifndef _AMI_STREAM_H
+#define _AMI_STREAM_H
+
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <unistd.h>
+
+#include <iostream>
+#include <cstring>
+using namespace std;
+
+#include "mm.h" // Get the memory manager.
+
+#ifdef __MINGW32__
+#define getpagesize() (4096)
+#endif
+
+#define DEBUG_DELETE if(0)
+
+// The name of the environment variable which keeps the name of the
+// directory where streams are stored
+#define STREAM_TMPDIR "STREAM_DIR"
+
+// All streams will be names STREAM_*****
+#define BASE_NAME "STREAM"
+
+#define STREAM_BUFFER_SIZE (1<<15)
+
+
+//
+// AMI error codes are returned using the AMI_err type.
+//
+enum AMI_err {
+  AMI_ERROR_NO_ERROR = 0,
+  AMI_ERROR_IO_ERROR,
+  AMI_ERROR_END_OF_STREAM,
+  AMI_ERROR_OUT_OF_RANGE,
+  AMI_ERROR_READ_ONLY,
+  AMI_ERROR_OS_ERROR,
+  AMI_ERROR_MM_ERROR,
+  AMI_ERROR_OBJECT_INITIALIZATION,
+  AMI_ERROR_PERMISSION_DENIED,
+  AMI_ERROR_INSUFFICIENT_MAIN_MEMORY,
+  AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS,
+  AMI_ERROR_ENV_UNDEFINED,
+  AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
+};
+
+
+//
+// AMI stream types passed to constructors
+//
+enum AMI_stream_type {
+    AMI_READ_STREAM = 1,	// Open existing stream for reading
+    AMI_WRITE_STREAM,		// Open for writing.  Create if non-existent
+    AMI_APPEND_STREAM,		// Open for writing at end.  Create if needed.
+    AMI_READ_WRITE_STREAM	// Open to read and write.
+};
+
+
+
+
+enum persistence {
+    // Delete the stream from the disk when it is destructed.
+  PERSIST_DELETE = 0,
+  // Do not delete the stream from the disk when it is destructed.
+  PERSIST_PERSISTENT,
+  // Delete each block of data from the disk as it is read.
+  PERSIST_READ_ONCE
+};
+
+
+
+template<class T> 
+class AMI_STREAM {
+private:
+  FILE * fp;
+  //int fd;	//descriptor of file
+  AMI_stream_type  access_mode;
+  char path[BUFSIZ];
+  persistence per;
+
+  //0 for streams, positive for substreams
+  unsigned int substream_level;
+  
+  // If this stream is actually a substream, these will be set to
+  // indicate the portion of the file that is part of this stream.  If
+  // the stream is the whole file, they will be set to -1. Both are in
+  // T units.
+  off_t logical_bos;
+  off_t logical_eos;
+
+  //stream buffer passed in the call to setvbuf when file is opened
+  char* buf;
+
+protected:
+  unsigned int get_block_length();
+  
+public:
+   T read_tmp;
+
+  // An AMI_stream with default name
+  AMI_STREAM();
+  
+  // An AMI stream based on a specific path name.
+  AMI_STREAM(const char *path_name, 
+	     //	     AMI_stream_type st = AMI_READ_WRITE_STREAM);
+	     AMI_stream_type st);
+  
+  // A psuedo-constructor for substreams.
+  AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
+			AMI_STREAM<T> **sub_stream);
+  
+  // Destructor
+  ~AMI_STREAM(void);
+  
+  // Read and write elements.
+  AMI_err read_item(T **elt);
+
+  AMI_err write_item(const T &elt);
+  
+  AMI_err read_array(T *data, off_t len);
+  
+  AMI_err write_array(const T *data, off_t len);
+  
+  // Return the number of items in the stream.
+  off_t stream_len(void);
+  
+  // Return the path name of this stream.
+  AMI_err name(char **stream_name);
+  
+  // Move to a specific item in the stream.
+  AMI_err seek(off_t offset);
+  
+  // Query memory usage
+  AMI_err main_memory_usage(size_t *usage,
+			    //MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
+			    MM_stream_usage usage_type);
+  
+  void persist(persistence p);
+  
+  char *sprint();
+};
+
+
+/**********************************************************************/
+template<class T>
+unsigned int AMI_STREAM<T>::get_block_length() {
+  return getpagesize();
+}
+
+
+/**********************************************************************/
+/* creates a random file name, opens the file for reading and writing
+   and and returns a file descriptor */
+int ami_single_temp_name(const std::string& base, char* tmp_path);
+
+
+/**********************************************************************/
+/* given fd=fide descriptor, associates with it a stream aopened in
+   access_mode and returns it */
+FILE*  open_stream(int fd, AMI_stream_type st);
+
+
+/**********************************************************************/
+/* open the file whose name is pathname in access mode */
+FILE* open_stream(char* pathname, AMI_stream_type st);
+
+
+
+
+/********************************************************************/
+//  An  AMI stream with default name.
+template<class T>
+AMI_STREAM<T>::AMI_STREAM() {
+  
+  access_mode = AMI_READ_WRITE_STREAM;
+  int fd = ami_single_temp_name(BASE_NAME, path);
+  fp = open_stream(fd, access_mode);
+
+  
+  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
+  buf = new char[STREAM_BUFFER_SIZE];
+  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
+    cerr << "setvbuf failed (stream " << path << ")" << endl;
+    exit(1);
+  }
+  
+  // By default, all streams are deleted at destruction time.
+  per = PERSIST_DELETE;
+
+   // Not a substream.
+  substream_level  = 0;
+  logical_bos = logical_eos = -1;
+
+  seek(0);
+
+  // Register memory usage before returning.
+  //size_t usage; 
+  //main_memory_usage(&usage,  MM_STREAM_USAGE_CURRENT);
+  //MM_manager.register_allocation(usage);
+}
+
+
+
+/**********************************************************************/
+// An AMI stream based on a specific path name.
+template<class T>
+AMI_STREAM<T>::AMI_STREAM(const char *path_name,
+			  AMI_stream_type st = AMI_READ_WRITE_STREAM) {
+  
+  access_mode = st;
+  strcpy(path, path_name);
+  fp = open_stream(path, st);
+  
+
+  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
+  buf = new char[STREAM_BUFFER_SIZE];
+  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
+    cerr << "setvbuf failed (stream " << path << ")" << endl;
+    exit(1);
+  }
+
+  // By default, all streams are deleted at destruction time.
+  per = PERSIST_DELETE;
+  
+  // Not a substream.
+  substream_level  = 0;
+  logical_bos = logical_eos = -1;
+  
+  seek(0);
+
+  // Register memory usage before returning.
+  //size_t usage; 
+  //main_memory_usage(&usage,  MM_STREAM_USAGE_CURRENT);
+  //MM_manager.register_allocation(usage);
+};
+
+
+
+/**********************************************************************/
+ // A psuedo-constructor for substreams.
+template<class T>
+AMI_err AMI_STREAM<T>::new_substream(AMI_stream_type st,
+				     off_t sub_begin,
+				     off_t sub_end,
+				     AMI_STREAM<T> **sub_stream) {
+
+  //assume this for now
+  assert(st == AMI_READ_STREAM);
+
+  //check range
+  if (substream_level) {
+     if( (sub_begin >= (logical_eos - logical_bos)) ||
+	 (sub_end >= (logical_eos - logical_bos)) ) {
+       
+       return AMI_ERROR_OUT_OF_RANGE;
+     }
+  }  else {
+    off_t len = stream_len();
+    if (sub_begin > len || sub_end > len) {
+
+      return AMI_ERROR_OUT_OF_RANGE;
+    }
+  }
+
+  //reopen the file 
+  AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
+  
+  // Set up the beginning and end positions.
+  if (substream_level) {
+    substr->logical_bos = logical_bos + sub_begin;
+    substr->logical_eos = logical_bos + sub_end + 1;
+  } else {
+    substr->logical_bos = sub_begin;
+    substr->logical_eos = sub_end + 1;
+  }
+  
+  // Set the current position.
+  substr->seek(0);
+  
+  //set substream level
+  substr->substream_level = substream_level + 1;
+
+  //set persistence
+  substr->per = per;
+
+  //*sub_stream = (AMI_base_stream < T > *)substr;
+  *sub_stream = substr;
+  return  AMI_ERROR_NO_ERROR;
+};
+
+
+
+/**********************************************************************/
+// Return the number of items in the stream.
+template<class T>
+off_t AMI_STREAM<T>::stream_len(void) {
+
+  fflush(fp);
+
+  struct stat buf;
+  if (stat(path, &buf) == -1) {
+    perror("AMI_STREAM::stream_len(): fstat failed ");
+    assert(0);
+    exit(1);
+  }
+
+  //fprintf(stderr, "%s: length = %lld\n", path, buf.st_size);
+
+  return (buf.st_size / sizeof(T));
+};
+
+
+
+/**********************************************************************/
+// Return the path name of this stream.
+template<class T>
+AMI_err AMI_STREAM<T>::name(char **stream_name)  {
+  
+  *stream_name = new char [strlen(path) + 1];
+  strcpy(*stream_name, path);
+  
+  return AMI_ERROR_NO_ERROR;
+};
+
+
+
+/**********************************************************************/
+// Move to a specific offset within the (sub)stream.
+template<class T>
+AMI_err AMI_STREAM<T>::seek(off_t offset) {
+
+  off_t seek_offset;
+  
+  if (substream_level) {
+    //substream
+    if (offset  > (unsigned) (logical_eos - logical_bos)) {
+      //offset out of range
+      cerr << "AMI_STREAM::seek bos=" << logical_bos << ", eos=" << logical_eos
+	   << ", offset " << offset << " out of range.\n";
+      assert(0);
+      exit(1);
+    } else {
+      //offset in range 
+      seek_offset = (logical_bos + offset) * sizeof(T);
+    }
+
+
+  } else {
+    //not a substream
+    seek_offset = offset * sizeof(T);
+  }
+
+  if (fseek(fp, seek_offset, SEEK_SET) == -1) {
+    cerr << "AMI_STREAM::seek offset=" << seek_offset << "failed.\n";
+    assert(0);
+    exit(1);
+  }
+  
+  return AMI_ERROR_NO_ERROR;
+}
+
+
+
+
+/**********************************************************************/
+// Query memory usage
+template<class T>
+AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
+					 MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
+  
+   switch (usage_type) {
+   case MM_STREAM_USAGE_OVERHEAD:
+     *usage = sizeof (*this);
+     break;
+   case MM_STREAM_USAGE_BUFFER:
+     // *usage = get_block_length();
+     *usage = STREAM_BUFFER_SIZE*sizeof(char);
+     break;
+   case MM_STREAM_USAGE_CURRENT:
+   case MM_STREAM_USAGE_MAXIMUM:
+     // *usage = sizeof (*this) + get_block_length();
+     *usage = sizeof (*this) + STREAM_BUFFER_SIZE*sizeof(char);
+     break;
+   }
+   return AMI_ERROR_NO_ERROR;
+};
+
+
+
+/**********************************************************************/
+template<class T>
+AMI_STREAM<T>::~AMI_STREAM(void)  {
+
+  DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
+  delete buf;
+  assert(fp);
+  fclose(fp);
+  
+  // Get rid of the file if not persistent and if not substream.
+  if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
+    if (remove(path) == -1) {
+      cerr << "AMI_STREAM: failed to unlink " << path << endl;
+      perror("cannot unlink ");
+      assert(0);
+      exit(1);
+    }
+  }
+  // Register memory deallocation before returning.
+  //size_t usage; 
+  //main_memory_usage(&usage,  MM_STREAM_USAGE_CURRENT);
+  //MM_manager.register_deallocation(usage);
+ };
+
+
+
+/**********************************************************************/
+template<class T>
+AMI_err AMI_STREAM<T>::read_item(T **elt)  {
+
+  assert(fp);
+  //if we go past substream range
+  if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+    return AMI_ERROR_END_OF_STREAM;
+  
+  } else {
+    if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
+      //cerr << "file=" << path << ":";
+      //perror("cannot read!");
+      //assume EOF --should fix this XXX
+      return AMI_ERROR_END_OF_STREAM;
+    }
+    
+    *elt = &read_tmp;
+    return AMI_ERROR_NO_ERROR; 
+  }
+};
+
+
+
+
+/**********************************************************************/
+template<class T>
+AMI_err AMI_STREAM<T>::read_array(T *data, off_t len) {
+  
+  assert(fp);
+  
+  //if we go past substream range
+  if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+    return AMI_ERROR_END_OF_STREAM;
+    
+  } else {
+	if (fread((void*)data, sizeof(T), len, fp) < len) {
+	  cerr << "file=" << path << ":";
+	  perror("cannot read!");    
+	  //assume EOF --should fix this XXX
+	  return AMI_ERROR_END_OF_STREAM;
+	}
+    return AMI_ERROR_NO_ERROR; 
+  }
+};
+
+
+
+
+/**********************************************************************/
+template<class T>
+AMI_err AMI_STREAM<T>::write_item(const T &elt) {
+
+  assert(fp);
+  //if we go past substream range
+  if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+    return AMI_ERROR_END_OF_STREAM;
+  
+  } else {
+    if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
+      cerr << "AMI_STREAM::write_item failed.\n";
+      assert(0);
+      exit(1);
+    }
+    return AMI_ERROR_NO_ERROR;
+  }
+};
+
+
+/**********************************************************************/
+template<class T>
+AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
+
+  assert(fp);
+  //if we go past substream range
+  if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+    return AMI_ERROR_END_OF_STREAM;
+    
+  } else {
+    if (fwrite(data, sizeof(T), len,fp) < len) {
+      cerr << "AMI_STREAM::write_item failed.\n";
+      assert(0);
+      exit(1);
+    }
+    return AMI_ERROR_NO_ERROR;
+  }
+};
+        
+
+/**********************************************************************/
+template<class T>
+void AMI_STREAM<T>::persist(persistence p)  {
+   per = p;
+};
+
+
+
+/**********************************************************************/
+// sprint()
+// Return a string describing the stream
+//
+// This function gives easy access to the file name, length.
+// It is not reentrant, but this should not be too much of a problem 
+// if you are careful.
+template<class T>
+char *AMI_STREAM<T>::sprint()  {
+  static char buf[BUFSIZ];
+  sprintf(buf, "[AMI_STREAM %s %ld]", path, (long)stream_len());
+  return buf;
+};
+
+#endif // _AMI_STREAM_H 

Copied: grass/trunk/include/iostream/embuffer.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/embuffer.h)
===================================================================
--- grass/trunk/include/iostream/embuffer.h	                        (rev 0)
+++ grass/trunk/include/iostream/embuffer.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,1311 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+#ifndef __EMBUFFER_H
+#define __EMBUFFER_H
+
+
+#include <stdio.h>
+#include <math.h>
+#include <assert.h>
+#include <stdlib.h>
+
+#include "ami_config.h" //for SAVE_MEMORY
+#include "ami_stream.h"
+#include "mm.h"
+#include "mm_utils.h"
+#include "pqheap.h"
+
+
+
+
+#define MY_LOG_DEBUG_ID(x) //inhibit debug printing
+//#define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
+
+
+
+/**********************************************************
+                  DEBUGGING FLAGS 
+***********************************************************/
+
+//setting this enables checking that the streams/arrays inserted in
+//buffers are sorted in increasing order
+//#define EMBUF_CHECK_INSERT
+
+//enable checking that stream name is the same as the one stored in
+//the buffer name[]
+//#define EMBUF_CHECK_NAME
+
+//enable printing names as they are checked
+//#define EMBUF_CHECK_NAME_PRINT
+
+//enable printing when streams in  a buffer are shifted left to 
+//check that names are shifted accordingly
+//#define EMBUF_DELETE_STREAM_PRINT
+
+//enable printing the name of the stream which is inserted in buff
+//#define EMBUF_PRINT_INSERT
+
+//enable printing the stream names/sizes in cleanup()
+//#define EMBUF_CLEANUP_PRINT
+
+//enable printing when get/put_stream is called (for each stream)
+//#define EMBUF_PRINT_GETPUT_STREAM
+
+//enable printing when get/put_streams is called 
+//#define EMBUF_PRINT_GETPUT_STREAMS
+
+/***********************************************************/
+
+
+
+
+
+/*****************************************************************/
+/* encapsulation of the key together with stream_id; used during
+   stream merging to save space;
+*/
+template<class KEY> 
+class merge_key {
+public:
+  KEY k;
+  unsigned int str_id; //id of the stream where key comes from
+
+public:
+  merge_key(): str_id(0) {}
+
+  merge_key(const KEY &x, const unsigned int sid):
+    k(x), str_id(sid) {}
+  
+  ~merge_key() {}
+  
+  void set(const KEY &x, const unsigned int sid) {
+    k = x;
+    str_id = sid;
+  }
+  KEY key()  const {
+    return k; 
+  }
+  unsigned int stream_id() const  {
+    return str_id;
+  }
+  KEY getPriority()  const {
+    return k;
+  }
+
+  friend ostream& operator<<(ostream& s, const merge_key<KEY> &x) {
+    return s << "<str_id=" << x.str_id << "> " << x.k << " ";
+  }
+  friend int operator < (const merge_key &x, 
+				const merge_key &y) {
+    return (x.k < y.k);
+  }
+  friend int operator <= (const merge_key &x, 
+				 const merge_key &y) {
+    return (x.k <= y.k);
+  }
+  friend int operator > (const merge_key &x, 
+				const merge_key &y) {
+    return (x.k > y.k);
+  }
+  friend int operator >= (const merge_key &x, 
+				 const merge_key &y) {
+    return (x.k >= y.k);
+  }
+  friend int operator != (const merge_key &x, 
+				 const merge_key &y) {
+    return (x.k != y.k);
+  }
+  friend int operator == (const merge_key &x, 
+				 const merge_key &y) {
+    return (x.k == y.k);
+  }
+  friend merge_key operator + (const  merge_key &x, 
+				      const  merge_key &y) {
+    assert(0);
+    return x;
+    //  Key sum = x.k + y.k;
+    //  merge_key f(sum, x.str_id);
+    //  return f;
+  }
+
+};
+
+
+
+
+
+
+/***************************************************************** 
+ ***************************************************************** 
+ ***************************************************************** 
+
+ external_memory buffer
+ 
+ Each level-i buffer can store up to <arity>^i * <basesize> items,
+ where tipically <arity> is \theta(m) and <basesize> is \theta(M);
+ therefore log_m{n/m} buffers are needed to store N items, one
+ buffer for each level 1..log_m{n/m}. All buffers must have same
+ values or <arity> and <basesize>.
+ 
+ Functionality:
+ 
+ A level-i on-disk buffer stores <arity>^i * <basesize> items of
+ data, organized in <arity> streams of <arity>^{i-1} items each;
+ <basesize> is same for all buffers and equal to the size of the
+ level 0 buffer (in memory buffer).
+ 
+ Invariant: all the <arity> streams of a level-i buffer are in
+ sorted order; in this way sorting the buffer is done by merging the
+ <arity> streams in linear time. 
+ 
+ Items are inserted in level i-buffer only a whole stream at a time
+ (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
+ the buffer are full, the buffer is sorted and emptied into a stream
+ of a level (i+1)-buffer. 
+ 
+ The <arity> streams of a buffer are allocated contigously from left
+ to r ight. The unused streams are NULL; The buffer keeps the index of
+ the last used(non-NULL) stream. When a buffer becomes full and is
+ empty, all its buffers are set to NULL.
+
+ ***************************************************************** 
+ ***************************************************************** 
+ ***************************************************************** */
+
+/* T is a type with priority of type K and method getPriority() */
+template<class T, class Key> 
+class em_buffer {
+private: 
+
+  //number of streams in a buffer;
+  unsigned int arity;
+  
+  //level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
+  //has a slightly different behaviour so it is implemented as a
+  //different class <im_buffer>)
+  unsigned short level;
+
+  //level-i buffer contains m streams of data, each of size
+  //arity^{i-1}*basesize;
+  AMI_STREAM<T>** data;
+  
+  //the buffers can be depleted to fill the internal pq;
+  //keep an array which counts, for each stream, how many elements 
+  //have been deleted (implicitely from the begining of stream)
+  long* deleted;
+
+  //nb of items in each substream; this can be found out by calling
+  //stream_len() on the stream, but it is more costly esp in the case
+  //when streams are on disk and must be moved in and out just to find
+  //stream length; streamsize is set only at stream creation, and the
+  //actual size must substract the number of iteme deleted from the
+  //bos
+  unsigned long* streamsize;
+  
+  //index of the next available(empty) stream (out of the total m
+  //streams in the buffer);
+  unsigned int index;
+  
+  //nb of items in a stream of level_1 buffer
+  unsigned long basesize;
+
+
+public:
+
+  //create a level-i buffer of given basesize;
+  em_buffer(const unsigned short i, const unsigned long bs, 
+			const unsigned int ar);
+  
+  //copy constructor;
+  em_buffer(const em_buffer &buf);
+
+  //free the stream array and the streams pointers
+  ~em_buffer();
+  
+  //return the level of the buffer;
+  unsigned short get_level() const { return level;}
+
+  //return the ith stream (load stream in memory)
+  AMI_STREAM<T>* get_stream(unsigned int i);
+  
+  //return a pointer to the streams of the buffer (loads streams in
+  //memory)
+  AMI_STREAM<T>** get_streams();
+
+  //put the ith stream back to disk
+  void put_stream(unsigned int i);
+  
+  //called in pair with get_streams to put all streams back to disk
+  void put_streams();
+
+  //return a pointer to the array of deletion count for each stream
+  long* get_bos() const { return deleted;}
+  
+  //return the index of the last stream in buffer which contains data;
+  unsigned int laststream() const { return index -1;}
+
+  //return the index of the next available stream in the buffer
+  unsigned int nextstream() const { return index;}
+
+  //increment the index of the next available stream in the buffer
+  void incr_nextstream() { ++index;}
+
+  //return nb of (non-empty) streams in buffer
+  unsigned int get_nbstreams() const { return index;}
+  
+  //return arity
+  unsigned int get_arity() const { return arity;}
+
+  //return total nb of deleted elements in all active streams of the buffer
+  long total_deleted() const {
+    long tot = 0;
+    for (unsigned int i=0; i< index; i++) {
+      tot += deleted[i];
+    }
+    return tot;
+  }
+  
+  //mark as deleted one more element from i'th stream
+  void incr_deleted(unsigned int i) {
+    assert(i<index);
+    deleted[i]++;
+  }
+
+
+  //return the nominal size of a stream (nb of items): 
+  //arity^{level-1}*basesize;
+  unsigned long get_stream_maxlen() const {
+    return (unsigned long)pow((double)arity,(double)level-1)*basesize;
+  }
+
+  //return the actual size of stream i; i must be the index of a valid
+  //stream
+  unsigned long get_stream_len(unsigned int i) {
+    //assert(i>= 0 && i<index);
+    return streamsize[i] - deleted[i];
+  }
+
+  //return the total current size of the buffer; account for the
+  //deleted elements;
+  unsigned long get_buf_len() {
+    unsigned long tot = 0;
+    for (unsigned int i=0; i< index; i++) {
+      tot += get_stream_len(i);
+    }
+    return tot;
+  }
+  
+  //return the total maximal capacity of the buffer
+  unsigned long get_buf_maxlen() {
+    return arity * get_stream_maxlen();
+  }
+ 
+  //return true if buffer is empty (all streams are empty)
+  bool is_empty() {
+    return ((nextstream() == 0) || (get_buf_len() == 0));
+  }
+  
+  //return true if buffer is full(all streams are full)
+  bool is_full() const {
+    return (nextstream() == arity);
+  }
+  
+  //reset
+  void reset();
+  
+  //clean buffer: in case some streams have been emptied by deletion
+  //delete them and shift streams left;
+  void cleanup();
+  
+
+  //create and return a stream which contains all elements of all
+  //streams of the buffer in sorted ascending order of their
+  //keys(priorities);
+  AMI_STREAM<T>* sort();
+
+
+  // insert an array into the buffer; can only insert one
+  // level-i-full-stream-len nb of items at a time; assume the length
+  // of the array is precisely the streamlen of level-i buffer n =
+  // (pow(arity,level-1)*basesize); assume array is sorted; return the
+  // number of items actually inserted
+  long insert(T* a, long n); 
+
+
+  // insert a stream into the buffer; assume the length of the stream
+  // is precisely the streamlen of level-i buffer n =
+  // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
+  // is set to point to the argument stream; (in this way no stream
+  // copying is done, just one pointer copy). The user should be aware
+  // the the argument stream is 'lost' - that is a stream cannot be
+  // inserted repeatedly into many buffers because this would lead to
+  // several buffers pointing to the same stream.
+
+  // stream is assumed sorted; bos = how many elements are deleted
+  // from the begining of stream;
+     
+  // return the number of items actually inserted 
+  long insert(AMI_STREAM<T>* str, 
+	      //long bos=0); 
+	      long bos); 
+  
+  //print range of elements in buffer
+  void print_range();
+  
+  //print all elements in buffer
+  void print();
+  
+ //prints the sizes of the streams in the buffer
+  void print_stream_sizes();
+ 
+  //print the elements in the buffer
+  friend ostream& operator<<(ostream& s,  em_buffer &b) {
+    s << "BUFFER_" << b.level << ": ";
+    if (b.index ==0) {
+      s << "[]";
+    } 
+    s << "\n";
+    b.get_streams();
+    for (unsigned int i=0; i < b.index; i++) {
+      b.print_stream(s, i);
+    }
+    b.put_streams();
+    return s;
+  }
+  
+ 
+private:
+
+  // merge the input streams; there are <arity> streams in total;
+  // write output in <outstream>; the input streams are assumed sorted
+  // in increasing order of their keys;
+  AMI_err substream_merge(AMI_STREAM<T>** instreams, 
+			  unsigned int arity, 
+			  AMI_STREAM<T> *outstream); 
+
+  
+  //print to stream the elements in i'th stream
+  void print_stream(ostream& s, unsigned int i); 
+
+
+
+#ifdef SAVE_MEMORY
+  //array of names of streams; 
+  char** name;
+
+  //return the designated name for stream i
+  char* get_stream_name(unsigned int i) const;
+ 
+  //print all stream names in buffer
+  void print_stream_names();
+
+
+  //checks that name[i] is the same as stream name; stream i must be in
+  //memory (by a previous get_stream call, for instance) in order to
+  //find its length
+  void check_name(unsigned int i);
+#endif
+
+};
+
+
+/************************************************************/
+//create a level-i buffer of given basesize;
+template <class T, class Key>
+em_buffer<T,Key>::em_buffer(const unsigned short i, const unsigned long bs, 
+							const unsigned int ar) : 
+  arity(ar), level(i),  basesize(bs) {
+
+  assert((level>=1) && (basesize >=0));
+ 
+  char str[100];
+  sprintf(str, "em_buffer: allocate %d AMI_STREAM*, total %ld\n",
+	  arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
+  MEMORY_LOG(str);
+  //allocate STREAM* array
+  //GCC-3.4 does not allow (TYPE)[array] 
+  //use TYPE[array]
+  data = new AMI_STREAM<T>*[arity];
+   
+  //allocate deleted array
+  sprintf(str, "em_buffer: allocate deleted array: %ld\n",
+		  (long)(arity*sizeof(long)));
+  MEMORY_LOG(str);
+  deleted = new long[arity];
+ 
+  //allocate streamsize array 
+  sprintf(str, "em_buffer: allocate streamsize array: %ld\n",
+		  (long)(arity*sizeof(long)));
+  MEMORY_LOG(str);
+  streamsize = new unsigned long[arity];
+ 
+#ifdef SAVE_MEMORY
+  //allocate name array
+  sprintf(str, "em_buffer: allocate name array: %ld\n",
+		  (long)(arity*sizeof(char*)));
+  MEMORY_LOG(str);
+  //GCC-3.4 does not allow (TYPE)[array] 
+  //use TYPE[array]
+  //name = new (char*)[arity];
+  name = new char*[arity];
+  assert(name);
+#endif
+
+  //assert data
+  if ((!data) || (!deleted) || (!streamsize)) {
+    cerr << "em_buffer: cannot allocate\n";
+    exit(1);
+  }
+  
+  //initialize the <arity> streams to NULL, deleted[], streamsize[]
+  //and name[]
+  for (unsigned int i=0; i< arity; i++) {
+    data[i] = NULL;
+    deleted[i] = 0;
+    streamsize[i] = 0;
+#ifdef SAVE_MEMORY
+    name[i] = NULL;
+#endif
+  }   
+  //set index
+  index = 0;
+
+#ifdef SAVE_MEMORY  
+  //streams_in_memory = false;
+#endif
+}
+
+
+/************************************************************/
+//copy constructor;
+template<class T, class Key>
+em_buffer<T,Key>::em_buffer(const em_buffer &buf): 
+  level(buf.level), basesize(buf.basesize), 
+  index(buf.index), arity(buf.arity) {
+
+  assert(0);//should not get called
+
+  MEMORY_LOG("em_buffer: copy constr start\n");
+  get_streams();
+  for (unsigned int i=0; i< index; i++) {
+    assert(data[i]);
+    delete data[i]; //delete old stream if existing 
+    data[i] = NULL;
+    
+    //call copy constructor; i'm not sure that it actually duplicates
+    //the stream and copies the data; should that in the BTE
+    //sometimes..
+    data[i] = new AMI_STREAM<T>(*buf.data[i]);
+    deleted[i] = buf.deleted[i];
+    streamsize[i] = buf.streamsize[i];
+#ifdef SAVE_MEMORY
+    assert(name[i]);
+    delete name[i];
+    name[i] = NULL;
+    name[i] = buf.name[i];
+#endif
+  }
+  put_streams();
+  MEMORY_LOG("em_buffer: copy constr end\n");
+}
+
+
+/************************************************************/
+//free the stream array and the streams pointers
+template<class T, class Key>
+em_buffer<T,Key>::~em_buffer() {
+
+  assert(data);
+  //delete the m streams in the buffer
+  get_streams();
+  for (unsigned int i=0; i<index; i++) {
+    assert(data[i]);
+#ifdef SAVE_MEMORY
+    check_name(i);
+    delete name[i]; 
+#endif
+    delete data[i]; 
+    data[i] = NULL;
+  }
+  
+  delete [] data;
+  delete [] deleted;
+  delete [] streamsize;
+#ifdef SAVE_MEMORY  
+  delete [] name;
+#endif
+}
+
+
+#ifdef SAVE_MEMORY
+/************************************************************/
+//checks that name[i] is the same as stream name; stream i must be in
+//memory (by a previous get_stream call, for instance) in order to
+//find its length
+template<class T, class Key>
+void em_buffer<T,Key>::check_name(unsigned int i) {
+
+#ifdef EMBUF_CHECK_NAME
+  assert(i>=0 && i < index);
+  assert(data[i]);
+
+  char* fooname;
+  data[i]->name(&fooname);//name() allocates the string
+#ifdef EMBUF_CHECK_NAME_PRINT
+  cout << "::check_name: checking stream [" << level << "," << i << "] name:" 
+       << fooname << endl; 
+  cout.flush();
+#endif
+  if (strcmp(name[i], fooname) != 0) {
+    cerr << "name[" << i << "]=" << name[i]
+	 << ", streamname=" << fooname << endl;
+  } 
+  assert(strcmp(fooname, name[i]) == 0);
+  delete fooname;
+#endif  
+}
+#endif
+
+
+
+
+/************************************************************/
+//if SAVE_MEMORY flag is set, load the stream in memory; return the
+//ith stream
+template<class T, class Key>
+AMI_STREAM<T>* em_buffer<T,Key>::get_stream(unsigned int i) {
+
+  assert(i>=0 && i < index);
+     
+#ifdef SAVE_MEMORY
+  MY_LOG_DEBUG_ID("em_buffer::get_stream"); 
+  MY_LOG_DEBUG_ID(i);
+  
+  if (data[i] == NULL) {
+
+    //stream is on disk, load it in memory
+    assert(name[i]);
+    MY_LOG_DEBUG_ID("load stream in memory");
+    MY_LOG_DEBUG_ID(name[i]);
+  
+#ifdef EMBUF_PRINT_GETPUT_STREAM
+    cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n"; 
+    cout.flush();
+#endif
+    
+    //assert that file exists
+    FILE* fp;
+    if ((fp = fopen(name[i],"rb")) == NULL) {
+      cerr << "get_stream: checking that stream " << name[i] << "exists\n";
+      perror(name[i]);
+      assert(0);
+      exit(1);
+    }
+    fclose(fp);
+
+    //create an AMI_STREAM from file
+    data[i] = new AMI_STREAM<T>(name[i]);
+    assert(data[i]);
+
+  } else {
+
+    //if data[i] not NULL, stream must be already in memory    
+    MY_LOG_DEBUG_ID("stream not NULL");
+    MY_LOG_DEBUG_ID(data[i]->sprint());
+  }
+#endif
+  
+
+  //NOW STREAM IS IN MEMORY
+
+  //some assertion checks
+  assert(data[i]);
+  assert(data[i]->stream_len() == streamsize[i]);
+#ifdef SAVE_MEMORY
+  check_name(i);
+#endif
+
+  return data[i];
+}
+
+
+
+
+/************************************************************/
+//if SAVE_MEMORY flag is set, put the i'th stream back on disk
+template<class T, class Key>
+void em_buffer<T,Key>::put_stream(unsigned int i) {
+
+  assert(i>=0 && i < index);
+
+#ifdef SAVE_MEMORY
+  MY_LOG_DEBUG_ID("em_buffer::put_stream");
+  MY_LOG_DEBUG_ID(i);
+  
+  if (data[i] != NULL) {
+
+    //stream is in memory, put it on disk
+    MY_LOG_DEBUG_ID("stream put to disk");
+    MY_LOG_DEBUG_ID(data[i]->sprint());
+
+    check_name(i);
+#ifdef EMBUF_PRINT_GETPUT_STREAM
+    cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n"; 
+    cout.flush();
+#endif
+  
+    //make stream persistent and delete it
+    data[i]->persist(PERSIST_PERSISTENT);
+    delete data[i]; 
+    data[i] = NULL;
+
+  } else {
+
+    //data[i] is NULL, so stream must be already put on disk
+    MY_LOG_DEBUG_ID("stream is NULL");
+  }
+#endif 
+  
+}
+
+
+
+
+/************************************************************/
+//return a pointer to the streams of the buffer
+template<class T, class Key>
+AMI_STREAM<T>** em_buffer<T,Key>::get_streams() { 
+
+#ifdef SAVE_MEMORY
+  MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
+#ifdef EMBUF_PRINT_GETPUT_STREAMS
+  cout << "em_buffer::get_streams (buffer " << level <<")"; 
+  cout << ": index = " << index << "(arity=" << arity << ")\n";
+  cout.flush();
+#endif
+
+  for (unsigned int i=0; i<index; i++) {
+    get_stream(i);
+    assert(data[i]);
+  }
+
+#endif
+
+  return data;
+}
+
+
+
+
+/************************************************************/
+//called in pair with load_streams to store streams on disk
+//and release the memory
+template<class T, class Key>
+void em_buffer<T,Key>::put_streams() { 
+
+#ifdef SAVE_MEMORY
+  MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
+#ifdef EMBUF_PRINT_GETPUT_STREAMS
+  cout << "em_buffer::put_streams (buffer " << level <<")"; 
+  cout << ": index = " << index << "(arity=" << arity << ")\n";
+  cout.flush();
+#endif
+
+  for (unsigned int i=0; i<index; i++) {
+    put_stream(i);
+    assert(data[i] == NULL);
+  }
+#endif
+
+}
+
+
+
+#ifdef SAVE_MEMORY
+/************************************************************/
+//return the name of the ith stream
+template<class T, class Key>
+char* em_buffer<T,Key>::get_stream_name(unsigned int i) const {
+  
+  assert(i>=0 && i<index);
+  assert(name[i]);
+  return name[i];
+}
+#endif
+
+
+
+
+#ifdef SAVE_MEMORY
+/************************************************************/
+template<class T, class Key>
+void em_buffer<T,Key>::print_stream_names() {
+  unsigned int i;
+  for (i=0; i<index; i++) {
+    assert(name[i]);
+    cout << "stream " << i << ": " << name[i] << endl;
+  }
+  cout.flush();
+}
+#endif
+
+
+
+
+/************************************************************/
+//clean buffer in case some streams have been emptied by deletion
+template<class T, class Key>
+void em_buffer<T,Key>::cleanup() {
+ 
+  MY_LOG_DEBUG_ID("em_buffer::cleanup()");
+#ifdef EMBUF_CLEANUP_PRINT
+#ifdef SAVE_MEMORY
+  if (index>0) {
+    cout << "before cleanup:\n";
+    print_stream_names();
+    print_stream_sizes();  
+    cout.flush();
+  }
+#endif
+#endif
+  
+  //load all streams in memory
+  get_streams(); 
+
+  //count streams of size=0
+  unsigned int i, empty=0;
+  for (i=0; i<index; i++) {
+   
+    if (get_stream_len(i) == 0) {
+      //printing..
+#ifdef EMBUF_DELETE_STREAM_PRINT      
+      cout<<"deleting stream [" << level << "," << i <<"]:" ;
+#ifdef SAVE_MEMORY
+      cout  << name[i]; 
+#endif
+      cout << endl;
+      cout.flush();
+#endif
+      
+#ifdef SAVE_MEMORY
+      //stream is empty ==> delete its name 
+      delete name[i];
+      name[i] = NULL;
+#endif
+
+      //stream is empty ==> reset data
+      assert(data[i]); 
+      //data[i]->persist(PERSIST_DELETE); //this is done automatically..
+      delete data[i]; 
+      data[i] = NULL;
+      deleted[i] = 0;
+      streamsize[i] = 0;
+      empty++;
+    }
+  }
+  //streams are in memory; all streams which are NULL must have been
+  //deleted
+
+  //shift streams to the left in case holes were introduced
+  unsigned int j=0;
+  if (empty) {
+#ifdef EMBUF_DELETE_STREAM_PRINT 
+    cout << "em_buffer::cleanup: shifting streams\n"; cout.flush();
+#endif
+    for (i=0; i<index; i++) {
+      //if i'th stream is not empty, shift it left if necessary
+      if (data[i]) {
+	if (i!=j) {
+	  //set j'th stream to point to i'th stream
+	  //cout << j << " set to " << i << endl; cout.flush();
+	  data[j] = data[i];
+	  deleted[j] = deleted[i];
+	  streamsize[j] = streamsize[i];
+	  //set i'th stream to point to NULL
+	  data[i] = NULL;
+	  deleted[i] = 0;
+	  streamsize[i] = 0;
+#ifdef SAVE_MEMORY
+	  //fix the names
+	  delete name[j];
+	  name[j] = name[i];
+	  name[i] = NULL;
+	  check_name(j);
+#endif
+	}  else {
+	  //cout << i << " left the same" << endl;
+	}
+	j++;
+      } //if data[i] != NULL
+    }//for i
+
+    //set the index 
+    assert(index == j + empty);
+    index = j;
+    
+#ifdef EMBUF_DELETE_STREAM_PRINT 
+    cout << "em_buffer::cleanup: index set to " << index << endl;
+    cout.flush();
+#endif  
+  } //if empty
+
+  //put streams back to disk
+  put_streams();
+
+#ifdef EMBUF_CLEANUP_PRINT
+#ifdef SAVE_MEMORY
+  if (index >0) {
+    cout << "after cleanup:\n";
+    print_stream_names();
+    print_stream_sizes();  
+    cout.flush();
+  }
+#endif
+#endif
+}
+
+
+
+
+/************************************************************/
+//delete all streams
+template<class T, class Key>
+void em_buffer<T,Key>::reset() {
+  
+  get_streams();
+  
+  //make streams not-persistent and delete them
+  for (unsigned int i=0; i<index; i++) {
+    assert(data[i]);
+    assert(streamsize[i] == data[i]->stream_len()); 
+#ifdef SAVE_MEMORY   
+    check_name(i);
+    delete name[i];
+    name[i] = NULL;
+#endif
+    
+    data[i]->persist(PERSIST_DELETE);
+
+    delete data[i]; 
+    data[i] = NULL;
+    deleted[i] = 0;
+    streamsize[i] = 0;
+  }
+  
+  index = 0;
+}
+
+
+
+/************************************************************/
+//create and return a stream which contains all elements of 
+//all streams of the buffer in sorted ascending order of 
+//their keys  (priorities); 
+template<class T, class Key>
+AMI_STREAM<T>* em_buffer<T,Key>::sort() {
+  
+  //create stream
+  MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
+
+  AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>();
+  assert(sorted_stream);
+  
+  //merge the streams into sorted stream
+  AMI_err aerr;
+  //Key dummykey;
+  //must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
+  // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream, 
+  // 				  0, dummykey);
+  //could not use AMI_merge so i had to write my own..
+
+  get_streams();
+
+  aerr = substream_merge(data, arity, sorted_stream);
+  assert(aerr == AMI_ERROR_NO_ERROR);
+ 
+  put_streams();
+  
+  return sorted_stream;
+}
+
+
+  
+
+/************************************************************/
+/* merge the input streams; there are <arity> streams in total; write
+   output in <outstream>;
+   
+   the input streams are assumed sorted in increasing order of their
+   keys;
+   
+   assumes the instreams are in memory (no need for get_streams()) */
+template<class T, class Key>
+AMI_err em_buffer<T,Key>::substream_merge(AMI_STREAM<T>** instreams,
+					  unsigned int arity,
+					  AMI_STREAM<T> *outstream) {
+  
+  unsigned int i, j;
+  
+  //some assertion checks
+  assert(instreams);
+  assert(outstream);
+  for (i = 0; i < arity ; i++ ) {
+    assert(instreams[i]);
+#ifdef SAVE_MEMORY    
+    check_name(i);
+#endif
+  }
+
+  T* in_objects[arity]; //pointers to current leading elements of streams
+  AMI_err ami_err;
+  
+ 
+  char str[200];
+  sprintf(str, "em_buffer::substream_merge: allocate keys array, total %ldB\n",
+		  (long)((long)arity * sizeof(merge_key<Key>)));
+  MEMORY_LOG(str);
+
+ 
+  //keys array is initialized with smallest key from each stream (only
+  //non-null keys must be included) 
+  merge_key<Key>* keys;
+  //merge_key<Key>* keys = new (merge_key<Key>)[arity];
+  typedef merge_key<Key> footype;
+  keys = new footype[arity];
+  assert(keys);
+    
+  //count number of non-empty streams
+  j = 0; 
+  //rewind and read the first item from every stream initializing
+  //in_objects and keys
+  for (i = 0; i < arity ; i++ ) {
+    assert(instreams[i]);
+    //rewind stream
+    if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
+      return ami_err;
+    }
+    //read first item from stream
+    if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
+	AMI_ERROR_NO_ERROR) {
+      if (ami_err == AMI_ERROR_END_OF_STREAM) {
+	in_objects[i] = NULL;
+      } else {
+	return ami_err;
+      }
+    } else {
+      //include this key in the array of keys
+      Key k = in_objects[i]->getPriority();
+      keys[j].set(k, i);
+      j++; 
+    }
+  }
+  unsigned int NonEmptyRuns = j;
+
+  //build heap from the array of keys 
+  pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
+
+  //repeatedly extract_min from heap, write it to output stream and
+  //insert next element from same stream
+  merge_key<Key> minelt;
+  //rewind output buffer
+  ami_err = outstream->seek(0);
+  assert(ami_err == AMI_ERROR_NO_ERROR);
+  while (!mergeheap.empty()) {
+    //find min key and id of the stream from whereit comes
+    mergeheap.min(minelt);
+    i = minelt.stream_id();
+    //write min item to output stream
+    if ((ami_err = outstream->write_item(*in_objects[i])) 
+	!= AMI_ERROR_NO_ERROR) {
+      return ami_err;
+    }
+    //read next item from same input stream
+    if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
+	!= AMI_ERROR_NO_ERROR) {
+      if (ami_err != AMI_ERROR_END_OF_STREAM) {
+	return ami_err;
+      }
+    }
+    //extract the min from the heap and insert next key from same stream
+    if (ami_err == AMI_ERROR_END_OF_STREAM) {
+      mergeheap.delete_min();
+    } else {
+      Key k = in_objects[i]->getPriority();
+      merge_key<Key> nextit(k, i);
+      mergeheap.delete_min_and_insert(nextit);
+    }
+  } //while
+  
+  //delete [] keys; 
+  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
+  //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
+  //I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
+  //AND EVERYTHING SCREWS UP..
+  
+  return AMI_ERROR_NO_ERROR;
+}
+
+
+
+
+
+/************************************************************/
+// insert an array into the buffer; assume array is sorted; return the
+// number of items actually inserted; if SAVE_MEMORY FLAG is on, put
+// stream on disk and release its memory
+template<class T, class Key>
+long em_buffer<T,Key>::insert(T* a, long n) {
+
+  assert(a);
+
+  if (is_full()) {
+    cout << "em_buffer::insert: buffer full\n";
+    return 0;
+  }
+  
+  //can only insert one full stream at a time
+  //relaxed..
+  //assert(n == get_stream_maxlen());
+  
+  //create the stream
+  MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
+  AMI_STREAM<T>* str = new AMI_STREAM<T>();
+  assert(str);
+  
+  //write the array to stream
+  AMI_err ae;
+  for (long i=0; i< n; i++) {
+    ae = str->write_item(a[i]);
+    assert(ae == AMI_ERROR_NO_ERROR);
+  }
+  assert(n == str->stream_len());
+
+  //insert the stream in the buffer
+  return insert(str);
+}
+
+
+
+
+/************************************************************/  
+/* insert a stream into the buffer; the next free entry in the buffer
+   is set to point to the stream; if SAVE_MEMORY flag is on, the
+   stream is put to disk;
+   
+   the <nextstream> pointer of buffer is set to point to the argument
+   stream; (in this way no stream copying is done, just one pointer
+   copy). The user should be aware the the argument stream is 'lost' -
+   that is a stream cannot be inserted repeatedly into many buffers
+   because this would lead to several buffers pointing to the same
+   stream.
+   
+   stream is assume stream is sorted; bos = how many elements must be
+   skipped (were deleted) from the begining fo stream;
+   
+   return the number of items actually inserted */
+template<class T, class Key>
+long em_buffer<T,Key>::insert(AMI_STREAM<T>* str, long bos=0) {
+
+  assert(str);
+  
+  if (is_full()) {
+    cout << "em_buffer::insert: buffer full\n";
+    return 0;
+  }
+  
+  //can only insert one level-i-full-stream at a time;
+  //relaxed..can specify bos;
+  //not only that, but the length of the stream can be smaller 
+  //than nominal length, because a stream is normally obtained by 
+  //merging streams which can be shorter;
+  //assert(str->stream_len() == get_stream_len() - bos);
+
+
+#ifdef EMBUF_CHECK_INSERT
+  //check that stream is sorted
+  cout << "CHECK_INSERT: checking stream is sorted\n";
+  AMI_err ae;
+  str->seek(0);
+  T *crt=NULL, *prev=NULL;
+  while (str->read_item(&crt)) {
+    assert(ae == AMI_ERROR_NO_ERROR);
+    if (prev) assert(*prev <= *crt);
+  }
+#endif
+  
+  //nextstream must be empty
+  assert(str);
+  assert(data[nextstream()] == NULL);
+  assert(deleted[nextstream()] == 0);
+  assert(streamsize[nextstream()] == 0);
+#ifdef SAVE_MEMORY
+  assert(name[nextstream()] == NULL);
+#endif
+
+
+  //set next entry i the buffer to point to this stream
+  data[nextstream()] = str;
+  deleted[nextstream()] = bos;
+  streamsize[nextstream()] = str->stream_len();
+#ifdef SAVE_MEMORY
+  //set next name entry in buffer to point to this stream's name
+  char* s;
+  str->name(&s); //name() allocates the string
+  name[nextstream()] = s; 
+  
+  //put stream on disk and release its memory
+  str->persist(PERSIST_PERSISTENT);
+  delete str;  //stream should be persistent; just delete it 
+  data[nextstream()] = NULL;
+
+#ifdef EMBUF_PRINT_INSERT
+  cout << "insert stream " << s << " at buf [" << level 
+       << "," << nextstream() << "]" << endl;
+#endif
+#endif
+  
+  //increment the index of next available stream in buffer
+  incr_nextstream();
+
+#ifdef EMBUF_PRINT_INSERT
+  print_stream_sizes();
+  print_stream_names();
+#endif
+  
+#ifdef SAVE_MEMORY
+  MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
+  MY_LOG_DEBUG_ID(name[nextstream()-1]);
+#endif
+
+  //return nb of items inserted
+  return get_stream_len(nextstream()-1);
+}
+
+
+
+
+/************************************************************/  
+//print the elements of the i'th stream of the buffer to a stream;
+//assumes stream is in memory;
+template<class T, class Key>
+void em_buffer<T,Key>::print_stream(ostream& s, unsigned int i) {
+ 
+  assert(data[i]);
+  assert((i>=0) && (i<index));
+  
+  AMI_err ae;
+  T* x;
+
+  s << "STREAM " << i << ": [";      
+
+  ae = data[i]->seek(deleted[i]);
+  assert(ae == AMI_ERROR_NO_ERROR); 
+  
+  for (long j = 0; j < get_stream_len(i); j++) {
+    ae = data[i]->read_item(&x);
+    assert(ae == AMI_ERROR_NO_ERROR); 
+    s << *x << ",";
+  }
+  s << "]\n";
+}
+
+
+
+/************************************************************/ 
+//print elements range in buffer (read first and last element in each
+//substream and find global min and max)
+template<class T, class Key>
+void em_buffer<T,Key>::print_range() {
+
+  T *min, *max;
+  AMI_err ae;
+  
+  get_streams();
+
+  for (unsigned int i=0; i< index; i++) {
+    cout << "[";
+    //read min element in substream i
+    ae = data[i]->seek(deleted[i]);
+    assert(ae == AMI_ERROR_NO_ERROR);
+    ae = data[i]->read_item(&min);
+    assert(ae == AMI_ERROR_NO_ERROR);
+    cout << min->getPriority() << "..";
+    //read max element in substream i
+    ae = data[i]->seek(streamsize[i] - 1);
+    assert(ae == AMI_ERROR_NO_ERROR);
+    ae = data[i]->read_item(&max);
+    assert(ae == AMI_ERROR_NO_ERROR);
+    cout << max->getPriority() 
+	 << " (sz=" << get_stream_len(i) << ")] ";
+  }
+  for (unsigned int i=index; i< arity; i++) {
+    cout << "[] ";
+    }
+ 
+  put_streams();
+}
+
+
+
+/************************************************************/ 
+//print all elements in buffer
+template<class T, class Key>
+void em_buffer<T,Key>::print() {
+
+  T *x;
+  AMI_err ae;
+  
+  get_streams();
+
+  for (unsigned int i=0; i<index; i++) {
+    cout << "    [";
+    ae = data[i]->seek(deleted[i]);
+    assert(ae == AMI_ERROR_NO_ERROR);
+    for (unsigned long j=0; j<get_stream_len(i); j++) {
+      ae = data[i]->read_item(&x);
+      assert(ae == AMI_ERROR_NO_ERROR);
+      cout << x->getPriority() << ",";
+    }
+    cout << "]" << endl;
+  }
+  for (unsigned int i=index; i< arity; i++) {
+    cout << "[] ";
+  }
+
+  put_streams();
+}
+
+
+
+/************************************************************/ 
+//print the sizes of the substreams in the buffer
+template<class T, class Key>
+void em_buffer<T,Key>::print_stream_sizes() {
+
+  cout << "(streams=" << index << ") sizes=[";
+  for (unsigned int i=0; i< arity; i++) {
+    cout << get_stream_len(i) << ",";
+  }
+  cout << "]" << endl;
+  cout.flush();
+}
+
+
+
+#endif

Copied: grass/trunk/include/iostream/empq.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/empq.h)
===================================================================
--- grass/trunk/include/iostream/empq.h	                        (rev 0)
+++ grass/trunk/include/iostream/empq.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,294 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+#ifndef __EMPQ_H
+#define __EMPQ_H
+
+#include <stdio.h>
+#include <assert.h>
+
+
+
+#include "ami_config.h" //for SAVE_MEMORY
+#include "ami_stream.h"
+#include "mm.h"
+#include "mm_utils.h" //for MEMORY_LOG, getAvailableMemory
+#include "imbuffer.h"
+#include "embuffer.h"
+#include "pqheap.h"
+#include "minmaxheap.h"
+
+
+
+template<class T,class Key> class ExtendedEltMergeType;
+#define ExtendedMergeStream AMI_STREAM<ExtendedEltMergeType<T,Key> >
+
+
+/**********************************************************
+                  DEBUGGING FLAGS 
+***********************************************************/
+
+//enables printing messages when buffers are emptied
+//#define EMPQ_EMPTY_BUF_PRINT  
+
+//enables printing when pq gets filled from buffers
+//#define EMPQ_PQ_FILL_PRINT
+
+//enables priting inserts 
+//#define EMPQ_PRINT_INSERT
+
+//enables printing deletes
+//#define EMPQ_PRINT_EXTRACTALL
+
+//enables printing the empq on insert/extract_all_min
+//#define EMPQ_PRINT_EMPQ
+
+//enable priting the size of the EMPQ and nb of active streams
+//on fillpq() amd on empty_buff_0
+//#define EMPQ_PRINT_SIZE
+
+//enable printing 'fill pq from B0' in extract_min()
+//#define EMPQ_PRINT_FILLPQ_FROM_BUFF0
+
+//enable expensive size asserts
+//#define EMPQ_ASSERT_EXPENSIVE
+
+
+/**********************************************************/
+
+
+
+
+
+
+/* external memory priority queue
+
+   Functionality:
+
+   Keep a pqueue PQ of size \theta(M) in memory.  Keep a buffer B0 of
+   size \theta(M) in memory.  keep an array of external-memory
+   buffers, one for each level 1..log_m{n/m} (where N is the maximum
+   number of items in pqueue at any time).
+
+   invariants: 
+   1. PQ contains the smallest items in the structure.
+   
+   2. each stream of any external memory buffers is sorted in
+   increasing order.
+
+   insert(x): if (x < maximum_item(PQ) exchange x with
+   maximum_item(PQ); if buffer B0 is full, empty it; insert x in B0;
+   
+   extract_min():
+
+   analysis: 
+   
+   1. inserts: once the buffer B0 is empty, the next sizeof(B0)
+   inserts are free; one insert can cause many I/Os if cascading
+   emptying of external buffers Bi occurs. Emptying level-i buffer
+   costs <arity>^i*sizeof(B0)/B I/Os and occurs every
+   N/<arity>^i*sizeof(B0) inserts (or less, if deletes too). It can be
+   proved that the amortized time of 1 insert is 1/B*maxnb_buffers.
+*/
+
+/* 
+T is assumed to be a class for which getPriority() and getValue()
+are implemented; for simplicity it is assumed that the comparison
+operators have been overloaded on T such that 
+x < y <==> x.getPriority() < y.getPriority() 
+*/
+
+template<class T, class Key> 
+class em_pqueue {
+
+private:  
+  
+  //in memory priority queue
+  MinMaxHeap<T> *pq;
+  
+  //pqueue size
+  unsigned long pqsize;
+
+  //in-memory buffer
+  im_buffer<T> *buff_0;
+
+  //in-memory buffer size
+  unsigned long bufsize;
+
+  //external memory buffers
+  em_buffer<T,Key>** buff;
+  
+  /* number of external memory buffers statically allocated in the
+     beginning; since the number of buffers needed is \log_m{n/m}, we
+     cannot know it in advance; estimate it roughly and then reallocate
+     it dynamically on request;
+     
+     TO DO: dynamic reallocation with a bigger nb of external buffer
+     if structure becomes full */
+  unsigned short max_nbuf;
+  
+  //index of next external buffer entry available for use (i.e. is NULL)
+  unsigned short crt_buf;
+
+  //external buffer arity
+  unsigned int buf_arity;
+  
+
+public:
+  
+  //create an em_pqueue of specified size
+  em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf, 
+		   unsigned int buf_ar);  
+  
+  //create an em_pqueue capable to store <= N elements
+  em_pqueue();
+  em_pqueue(long N) { em_pqueue(); }; // N not used
+
+#ifdef SAVE_MEMORY
+  // create an empq, initialize its pq with im and insert amis in
+  // buff[0]; im should not be used/deleted after that outside empq
+  em_pqueue(MinMaxHeap<T> *im, AMI_STREAM<T> *amis);
+#endif
+
+  //copy constructor
+  em_pqueue(const em_pqueue &ep);
+
+  //clean up
+  ~em_pqueue();
+
+  //return the nb of elements in the structure 
+  unsigned long size();
+
+  //return true if empty
+  bool is_empty();
+  
+  //return true if full
+  bool is_full() { 
+    cout << "em_pqueue::is_full(): sorry not implemented\n";
+    exit(1);
+  }
+
+  //return the element with minimum priority in the structure
+  bool min(T& elt);
+  
+  //delete the element with minimum priority in the structure;
+  //return false if pq is empty
+  bool extract_min(T& elt);
+  
+  //extract all elts with min key, add them and return their sum
+  bool extract_all_min(T& elt);
+  
+  //insert an element; return false if insertion fails
+  bool insert(const T& elt);
+  
+  //return maximum capacity of i-th external buffer
+  long maxlen(unsigned short i);
+
+  //return maximum capacity of em_pqueue
+  long maxlen();
+
+  //print structure
+  void print_range();
+
+  void print();
+
+  //print the detailed size of empq (pq, buf_0, buff[i])
+  void print_size();
+
+  friend ostream& operator<<(ostream& s, const em_pqueue &empq) {
+    s << "EM_PQ: pq size=" << empq.pqsize
+      << ", buff_0 size=" << empq.bufsize 
+      << ", ext_bufs=" << empq.crt_buf 
+      << "(max " << empq.max_nbuf << ")\n";
+    s << "IN_MEMORY PQ: \n" << *(empq.pq) << "\n";
+    s << "IN_MEMORY BUFFER: \n" << *(empq.buff_0) << "\n";
+    for (unsigned short i=0; i < empq.crt_buf; i++) {
+      //s << "EM_BUFFER " << i << ":\n" ;
+      s << *(empq.buff[i]);
+    }
+    return s;
+  }
+
+
+protected:
+  //return the nb of active streams in the buffer
+  int active_streams() {
+    int totstr = 0;
+    for (unsigned short i = 0; i< crt_buf; i++) {
+      totstr+= buff[i]->get_nbstreams();
+    }
+    return totstr;
+  }
+
+  //called when buff_0 is full to empty it on external level_1 buffer;
+  //can produce cascading emptying
+  bool empty_buff_0();
+
+  //sort and empty buffer i into buffer (i+1) recursively; 
+  //called recursively by empty_buff_0() to empty subsequent buffers
+  //i must be a valid (i<crt_buf) full buffer
+  void empty_buff(unsigned short i);
+  
+  
+  /* merge the first <K> elements of the streams of input buffer,
+     starting at position <buf.deleted[i]> in each stream; there are
+     <buf.arity> streams in total; write output in <outstream>; the
+     items written in outstream are of type <merge_output_type> which
+     extends T with the stream nb and buffer nb the item comes from;
+     this information is needed later to distribute items back; do not
+     delete the K merged elements from the input streams; <bufid> is the
+     id of the buffer whose streams are being merged;
+     
+     the input streams are assumed sorted in increasing order of keys; */
+  AMI_err merge_buffer(em_buffer<T,Key> *buf,
+			      ExtendedMergeStream *outstr, long K);
+  
+
+  /* merge the first <K> elements of the input streams; there are
+     <arity> streams in total; write output in <outstream>;
+     
+     the input streams are assumed sorted in increasing order of their
+     keys; */
+  AMI_err merge_streams(ExtendedMergeStream** instr, 
+			       unsigned short arity,
+			       ExtendedMergeStream *outstr, long K);
+  
+  //deletes one element from <buffer, stream>
+  void delete_str_elt(unsigned short buf_id,
+			     unsigned int stream_id);
+
+  /* copy the minstream in the internal pqueue while merging with
+     buff_0; the smallest <pqsize> elements between minstream and
+     buff_0 will be inserted in internal pqueue; also, the elements
+     from minstram which are inserted in pqueue must be marked as
+     deleted in the source streams; */
+  void merge_bufs2pq(ExtendedMergeStream *minstream); 
+
+  //clean buffers in case some streams have been emptied 
+  void cleanup();
+  
+  //called when pq must be filled from external buffers
+  bool fillpq();
+
+  //print the nb of elements in each stream
+  void print_stream_sizes();
+};
+
+
+
+#endif

Copied: grass/trunk/include/iostream/empq_adaptive.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/empq_adaptive.h)
===================================================================
--- grass/trunk/include/iostream/empq_adaptive.h	                        (rev 0)
+++ grass/trunk/include/iostream/empq_adaptive.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,82 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+
+#ifndef __EMPQ_ADAPTIVE_H
+#define __EMPQ_ADAPTIVE_H
+
+
+#include "minmaxheap.h"
+#include "empq.h"
+#include "empq_impl.h"
+
+
+
+#define EMPQAD_DEBUG if(0)
+
+
+enum regim_type {
+  INMEM = 0,
+  EXTMEM,
+  EXTMEM_DEBUG
+};
+
+
+template<class T, class Key> 
+class EMPQueueAdaptive {
+private: 
+  //dictates if the structure works in the internal/external memory regim;
+  regim_type regim;  
+  MinMaxHeap<T> *im;
+  em_pqueue<T,Key> *em;
+  UnboundedMinMaxHeap<T> *dim;	// debug, internal memory pq
+public:
+  /* start in INMEM regim by allocating im of size precisely twice the
+     size of the (pqueue within) the em_pqueue; */
+  EMPQueueAdaptive(long N) : EMPQueueAdaptive() {};
+  EMPQueueAdaptive();
+  ~EMPQueueAdaptive();
+
+  void makeExternal();
+  void makeExternalDebug();
+
+  long maxlen() const;			//return the maximum nb of elts that can fit
+  bool is_empty() const;		//return true if empty
+  bool is_full() const;			//return true if full
+  bool min(T& elt);				//return the element with min priority XXX
+  //delete the element with minimum priority in the structure;
+  //return false if pq is empty
+  bool extract_min(T& elt);
+
+  //extract all elts with min key, add them and return their sum XXX
+  bool extract_all_min(T& elt);
+
+  /* insert an element; if regim == INMEM, try insert it in im, and if
+     it is full, extract_max pqsize/2 elements of im into a stream,
+     switch to EXTMEM and insert the stream into em; if regim is
+     EXTMEM, insert in em; */
+  bool insert(const T& elt);  
+
+  long size() const; //return the nb of elements in the structure
+
+  void verify();
+};
+
+
+
+#endif

Copied: grass/trunk/include/iostream/empq_adaptive_impl.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/empq_adaptive_impl.h)
===================================================================
--- grass/trunk/include/iostream/empq_adaptive_impl.h	                        (rev 0)
+++ grass/trunk/include/iostream/empq_adaptive_impl.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,431 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+#ifndef __EMPQ_ADAPTIVE_IMPL_H
+#define __EMPQ_ADAPTIVE_IMPL_H
+
+#include <stdio.h>
+#include <assert.h>
+
+#include "ami_config.h"
+#include "ami_stream.h"
+#include "mm.h"
+#include "mm_utils.h"
+#include "empq_adaptive.h"
+
+
+
+//defined in "empqAdaptive.H"
+//#define EMPQAD_DEBUG if(0)
+
+
+
+
+//------------------------------------------------------------
+//allocate an internal pqueue of size precisely twice 
+//the size of the pqueue within the em_pqueue;
+
+template<class T, class Key> 
+EMPQueueAdaptive<T,Key>::EMPQueueAdaptive() {
+  regim = INMEM;
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" 
+		    << endl;
+  
+  //------------------------------------------------------------
+  //set the size precisely as in empq constructor since we cannot 
+  //really call the em__pqueue constructor, because we don't want 
+  //the space allocated; we just want the sizes;
+  AMI_err ae;
+  size_t mm_avail = getAvailableMemory();
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: " 
+		    << ( (float)mm_avail/ (1<< 20)) << "MB" << endl;
+  
+  /* same calculations as empq constructor in order to estimate
+     overhead memory; this is because we want to allocate a pqueue of
+     size exactly double the size of the pqueue inside the empq;
+     switching from this pqueue to empq when the memory fills up will
+     be simple */
+  //------------------------------------------------------------
+  //AMI_STREAM memory usage
+  size_t sz_stream;
+  AMI_STREAM<T> dummy;
+  if ((ae = dummy.main_memory_usage(&sz_stream,
+				    MM_STREAM_USAGE_MAXIMUM)) !=
+      AMI_ERROR_NO_ERROR) {
+    cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
+    exit(1);
+  }  
+  //account for temporary memory usage
+  unsigned short max_nbuf = 2;
+  unsigned int buf_arity = mm_avail/(2 * sz_stream);
+  unsigned long mm_overhead = buf_arity*sizeof(merge_key<Key>) + 
+    max_nbuf * sizeof(em_buffer<T,Key>) +
+    2*sz_stream + max_nbuf*sz_stream;
+  mm_overhead *= 8; //overestimate..this should be fixed with
+  //a precise accounting of the extra memory required
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to " 
+		    << ((float)mm_overhead / (1 << 20)) << "MB" << endl;
+  if (mm_overhead > mm_avail) {
+    cerr << "overhead bigger than available memory ("<< mm_avail << "); "
+	 << "increase -m and try again\n";
+    exit(1);
+  }
+  mm_avail -= mm_overhead;
+  //------------------------------------------------------------
+
+
+  long pqsize = mm_avail/sizeof(T);
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
+
+  //initialize in memory pqueue and set em to NULL
+  im = new MinMaxHeap<T>(pqsize);
+  assert(im);
+  em = NULL;
+};
+
+
+
+
+template<class T, class Key> 
+EMPQueueAdaptive<T,Key>::~EMPQueueAdaptive() {
+  switch(regim) {
+  case INMEM:
+	delete im;
+	break;
+  case EXTMEM:
+	delete em; 
+	break;
+  case EXTMEM_DEBUG:
+	delete dim;
+	delete em; 
+	break;
+  }
+};
+
+
+
+//return the maximum nb of elts that can fit
+template<class T, class Key> 
+long 
+EMPQueueAdaptive<T,Key>::maxlen() const {
+  long m;
+  switch(regim) {
+  case INMEM:
+	assert(im);
+	m = im->get_maxsize();
+	break;
+  case EXTMEM:
+	assert(em);
+	m = em->maxlen();
+	break;
+  case EXTMEM_DEBUG:
+	m = em->maxlen();
+	break;
+  }
+  return m;
+};
+
+
+
+
+//return true if empty
+template<class T, class Key> 
+bool
+EMPQueueAdaptive<T,Key>::is_empty() const {
+  bool v = false;
+  switch(regim) {
+  case INMEM:
+	assert(im);
+	v = im->empty();
+	break;
+  case EXTMEM:
+	assert(em);
+	v = em->is_empty(); 
+	break;
+  case EXTMEM_DEBUG:
+	assert(dim->empty() == em->is_empty());
+	v = em->is_empty(); 
+	break;
+  }
+  return v;
+};
+
+
+//return true if full
+template<class T, class Key> 
+bool
+EMPQueueAdaptive<T,Key>::is_full() const { 
+  cerr << "EMPQueueAdaptive::is_full(): sorry not implemented\n";
+  assert(0);
+  exit(1);
+}
+
+
+//return the element with minimum priority in the structure
+template<class T, class Key> 
+bool
+EMPQueueAdaptive<T,Key>::min(T& elt) {
+  bool v=false, v1;
+  T tmp;
+  switch(regim) {
+  case INMEM:
+	assert(im);
+	v = im->min(elt);
+	break;
+  case EXTMEM:
+	assert(em);
+	v = em->min(elt);
+	break;
+  case EXTMEM_DEBUG:
+	v1 = dim->min(tmp);
+	v = em->min(elt);
+	//dim->verify();
+	if(!(tmp==elt)) {
+	  cerr << "------------------------------" << endl;
+	  cerr << dim << endl;
+	  cerr << "------------------------------" << endl;
+	  em->print();
+	  cerr << "------------------------------" << endl;
+	  cerr << "tmp=" << tmp << endl;
+	  cerr << "elt=" << elt << endl;
+	  cerr << "------------------------------" << endl;
+	  dim->destructiveVerify();
+	}
+	assert(v == v1);
+	assert(tmp == elt);
+	break;
+  }
+  return v;
+};
+
+template<class T, class Key> 
+void
+EMPQueueAdaptive<T,Key>::verify() {
+  switch(regim) {
+  case INMEM:
+	im->verify();
+	break;
+  case EXTMEM:
+	break;
+  case EXTMEM_DEBUG:
+	dim->verify();
+	break;
+  }
+}
+
+//extract all elts with min key, add them and return their sum
+template<class T, class Key> 
+bool 
+EMPQueueAdaptive<T,Key>::extract_all_min(T& elt) {
+  bool v=false, v1;
+  T tmp;
+  switch(regim) {
+  case INMEM:
+	assert(im);
+	v = im->extract_all_min(elt);
+	break;
+  case EXTMEM:
+	assert(em);
+	v = em->extract_all_min(elt);
+	break;
+  case EXTMEM_DEBUG:
+	v1 =  dim->extract_all_min(tmp);
+	v = em->extract_all_min(elt);
+	assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	assert(v == v1);
+	assert(tmp == elt);
+	break;
+  }
+  return v;
+};
+
+//return the nb of elements in the structure 
+template<class T, class Key> 
+long
+EMPQueueAdaptive<T,Key>::size() const {
+  long v=0, v1;
+  switch(regim) {
+  case INMEM:
+	assert(im);
+	v = im->size();
+	break;
+  case EXTMEM:
+	assert(em);
+	v = em->size();
+	break;
+  case EXTMEM_DEBUG:
+	v1 = dim->BasicMinMaxHeap<T>::size();
+	v = em->size();
+	assert(v == v1);
+	break;
+  }
+  return v;
+}
+
+
+
+
+// ----------------------------------------------------------------------
+template<class T, class Key> 
+bool 
+EMPQueueAdaptive<T,Key>::extract_min(T& elt) {
+    bool v=false, v1;
+	T tmp;
+	switch(regim) {
+	case INMEM:
+	  assert(im);
+	  v = im->extract_min(elt);
+	  break;
+	case EXTMEM:
+	  assert(em);
+	  v = em->extract_min(elt);
+	  break;
+	case EXTMEM_DEBUG:
+	  v1 =  dim->extract_min(tmp);
+	  v = em->extract_min(elt);
+	  assert(v == v1);
+	  assert(tmp == elt);
+	  assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	  break;
+    }
+	return v;
+};
+ 
+
+
+
+//------------------------------------------------------------
+ /* insert an element; if regim == INMEM, try insert it in im, and if
+    it is full, extract_max pqsize/2 elements of im into a stream,
+    switch to EXTMEM and insert the stream into em; if regim is
+    EXTMEM, insert in em; */
+template<class T, class Key> 
+bool 
+EMPQueueAdaptive<T,Key>::insert(const T& elt) {
+  bool v=false;
+  switch(regim) {
+  case INMEM:
+	if (!im->full()) {
+      im->insert(elt);
+	  v = true;
+    } else {
+	  makeExternal();
+	  v = em->insert(elt);      //insert the element
+	} 
+	break;
+  case EXTMEM:
+	v = em->insert(elt);
+	break;
+  case EXTMEM_DEBUG:
+	dim->insert(elt);
+	v = em->insert(elt);
+	assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	break;
+  }
+  return v;
+};
+
+template<class T, class Key> 
+void
+EMPQueueAdaptive<T,Key>::makeExternalDebug() {
+  assert(size() == 0);
+  switch(regim) {
+  case INMEM:
+	makeExternal();
+	break;
+  case EXTMEM:
+	break;
+  case EXTMEM_DEBUG:
+	assert(0);
+	break;
+  }
+  dim = new UnboundedMinMaxHeap<T>();
+  regim = EXTMEM_DEBUG;
+}
+
+
+
+template<class T>
+class baseCmpType {
+public:
+  static int compare(const T& x, const T& y) {
+    return  (x < y ? -1 : (x > y ? 1 : 0));
+  }
+
+};
+
+/* switch over to using an external priority queue */
+template<class T, class Key> 
+void
+EMPQueueAdaptive<T,Key>::makeExternal() {
+  AMI_err ae;
+#ifndef NDEBUG
+  long sizeCheck;
+  sizeCheck = size();
+#endif
+
+  assert(regim == INMEM);  
+  regim = EXTMEM;
+
+  EMPQAD_DEBUG cout << endl 
+			 << "EMPQUEUEADAPTIVE: memory full: "
+			 << "switching to external-memory pqueue " << endl;
+  
+  //create an AMI_stream and write in it biggest half elts of im;
+  AMI_STREAM<T> *amis0 = new AMI_STREAM<T>();
+  AMI_STREAM<T> *amis1;
+  assert(amis0 && amis1);
+  unsigned long pqsize = im->size();
+  //assert(im->size() == im->get_maxsize());
+  T x;
+  for (unsigned long i=0; i< pqsize/2; i++) {
+	int z = im->extract_max(x);
+	assert(z);
+	ae = amis0->write_item(x);
+	assert(ae == AMI_ERROR_NO_ERROR);
+  }
+  assert(amis0->stream_len() == pqsize/2);
+  EMPQAD_DEBUG { cout << "written " << pqsize/2 
+			   << " elts to stream\n"; cout.flush(); }
+  
+  assert(im->size() == pqsize/2 + (pqsize % 2));
+  
+  EMPQAD_DEBUG LOG_avail_memo();
+  
+  //sort the stream
+  baseCmpType<T> fun;
+  AMI_sort(amis0, &amis1, &fun); //XXX laura: replaced this to use a cmp obj
+  delete amis0;
+  EMPQAD_DEBUG { cout << "sorted the stream\n"; cout.flush(); }
+  
+  EMPQAD_DEBUG LOG_avail_memo();
+  
+  //set im to NULL and initialize em from im and amis1
+  em = new em_pqueue<T,Key>(im, amis1);
+  im = NULL;
+  assert(em);
+  EMPQAD_DEBUG { cout << "empq initialized from im\n"; cout.flush(); }
+  EMPQAD_DEBUG {em->print_size();}
+  
+  EMPQAD_DEBUG LOG_avail_memo();
+#ifndef NDEBUG
+  assert(sizeCheck == size());
+#endif
+};
+
+#endif

Copied: grass/trunk/include/iostream/empq_impl.h (from rev 32508, grass/trunk/raster/r.terraflow/IOStream/include/empq_impl.h)
===================================================================
--- grass/trunk/include/iostream/empq_impl.h	                        (rev 0)
+++ grass/trunk/include/iostream/empq_impl.h	2008-08-04 12:21:11 UTC (rev 32509)
@@ -0,0 +1,1511 @@
+/****************************************************************************
+ * 
+ *  MODULE:	r.terraflow
+ *
+ *  COPYRIGHT (C) 2007 Laura Toma
+ *   
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *****************************************************************************/
+
+#ifndef __EMPQ_IMPL_H
+#define __EMPQ_IMPL_H
+
+#include <stdio.h>
+
+#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
+#include <ostream>
+#else
+#include <ostream.h>
+#endif
+
+using namespace std;
+
+#include "empq.h"
+
+
+
+#if(0)
+#include "option.H"
+#define MY_LOG_DEBUG_ID(x) \
+  if(GETOPT("debug")) cerr << __FILE__ << ":" << __LINE__<< " " << x << endl;
+#endif
+
+#undef XXX
+#define XXX if(0)
+
+#define MY_LOG_DEBUG_ID(x)
+
+/*****************************************************************/
+/* encapsulation of the element=<key/prio, data> together with <buffer_id>
+   and <stream_id>; used during stream merging to remember where each
+   key comes from; 
+
+   assumes that class T implements: Key getPriority()
+
+   implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
+*/
+template<class T,class Key> 
+class ExtendedEltMergeType {
+
+private:
+  T x;
+  unsigned short buf_id;
+  unsigned int str_id;
+  
+public:
+  ExtendedEltMergeType() {}
+  
+  ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid):
+    x(e), buf_id(bid), str_id(sid) {}
+  
+  ~ExtendedEltMergeType() {}
+  
+  void set (T &e, unsigned short bid, unsigned int sid) {
+    x = e;
+    buf_id = bid;
+    str_id = sid;
+  }
+  T elt() const {
+    return x; 
+  }
+  unsigned short buffer_id() const  {
+    return buf_id;
+  }
+  unsigned int stream_id()  const {
+    return str_id;
+  }
+  Key getPriority() const {
+    return x.getPriority();
+  }
+  //print
+  friend ostream& operator<<(ostream& s, 
+				    const ExtendedEltMergeType<T,Key> &elt) {
+    return s << "<buf_id=" << elt.buf_id 
+	     << ",str_id=" << elt.str_id << "> "
+	     << elt.x << " ";
+  }
+  
+  friend int operator < (const ExtendedEltMergeType<T,Key> &e1, 
+				const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() < e2.getPriority());
+  }
+  friend int operator <= (const ExtendedEltMergeType<T,Key> &e1, 
+				 const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() <= e2.getPriority());
+  }
+  friend int operator > (const ExtendedEltMergeType<T,Key> &e1, 
+				const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() > e2.getPriority());
+  }
+  friend int operator >= (const ExtendedEltMergeType<T,Key> &e1, 
+				 const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() >= e2.getPriority());
+  }
+  friend int operator != (const ExtendedEltMergeType<T,Key> &e1, 
+				 const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() != e2.getPriority());
+  }
+  friend int operator == (const ExtendedEltMergeType<T,Key> &e1, 
+				 const ExtendedEltMergeType<T,Key> &e2) {
+    return (e1.getPriority() == e2.getPriority());
+  }
+
+};
+
+
+
+//************************************************************/
+//create an em_pqueue
+template<class T, class Key>
+ em_pqueue<T,Key>::em_pqueue(long pq_sz, long buf_sz , 
+				    unsigned short nb_buf, 
+				    unsigned int buf_ar):
+  pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf), 
+  crt_buf(0), buf_arity(buf_ar) {
+  
+  //____________________________________________________________
+  //ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
+  AMI_err ae;
+  size_t mm_avail = getAvailableMemory();
+  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n", 
+	       mm_avail/(float)(1<<20));
+  printf("EM_PQUEUE:available memory before allocation: %ldB\n", 
+	       mm_avail);
+  //____________________________________________________________
+  //ALLOCATE STRUCTURE
+   //some dummy checks..
+  assert(pqsize > 0 && bufsize > 0);
+
+  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
+  //initialize in memory pqueue
+  pq = new MinMaxHeap<T>(pqsize);
+  assert(pq);
+
+  MEMORY_LOG("em_pqueue: allocating buff_0\n");
+  //initialize in memory buffer
+  buff_0 = new im_buffer<T>(bufsize);
+  assert(buff_0);
+
+  char str[200];
+  sprintf(str, "em_pqueue: allocating array of %ld buff pointers\n",
+		  (long)max_nbuf);
+  MEMORY_LOG(str);
+  
+  //allocate ext memory buffers array
+  buff = new em_buffer<T,Key>* [max_nbuf];
+  assert(buff);
+  for (unsigned short i=0; i<max_nbuf; i++) {
+    buff[i] = NULL;
+  }
+
+
+  //____________________________________________________________
+  //some memory checks- make sure the empq fits in memory !!
+
+  //estimate available memory after allocation 
+  mm_avail = getAvailableMemory();
+  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n", 
+	       mm_avail/(float)(1<<20));
+  
+  //estimate AMI_STREAM memory usage
+  size_t sz_stream;
+  AMI_STREAM<T> dummy;
+  if ((ae = dummy.main_memory_usage(&sz_stream,
+				    MM_STREAM_USAGE_MAXIMUM)) !=
+      AMI_ERROR_NO_ERROR) {
+    cout << "em_pqueue constructor: failing to get stream_usage\n";
+    exit(1);
+  }  
+  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
+  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
+  
+  //estimate memory overhead
+  long mm_overhead = buf_arity*sizeof(merge_key<Key>) + 
+    max_nbuf * sizeof(em_buffer<T,Key>) +
+    2*sz_stream + max_nbuf*sz_stream;
+  
+  mm_overhead *= 8; //overestimate
+  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
+  if (mm_overhead > mm_avail) {
+    cout << "overhead bigger than available memory"
+	 << "increase -m and try again\n";
+    exit(1);
+  }
+  mm_avail -= mm_overhead;
+  
+
+  //arity*sizeof(AMI_STREAM) < memory
+  cout << "pqsize=" << pqsize
+       << ", bufsize=" << bufsize
+       << ", maximum allowed arity=" << mm_avail/sz_stream << endl;
+  if (buf_arity * sz_stream > mm_avail) {
+    cout << "sorry - empq excedes memory limits\n";
+    cout << "try again decreasing arity or pqsize/bufsize\n";
+    cout.flush();
+  }
+}
+
+
+//************************************************************/
+//create an em_pqueue capable to store <= N elements
+template<class T, class Key>
+em_pqueue<T,Key>::em_pqueue() {
+  
+  MY_LOG_DEBUG_ID("em_pqueue constructor");
+
+
+  /************************************************************/
+  //available memory 
+  AMI_err ae;
+  //available memory
+  size_t mm_avail = getAvailableMemory();
+  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n", 
+	       mm_avail/(float)(1<<20));
+  cout.flush();
+
+  //AMI_STREAM memory usage
+  size_t sz_stream;
+  AMI_STREAM<T> dummy;
+  if ((ae = dummy.main_memory_usage(&sz_stream,
+				    MM_STREAM_USAGE_MAXIMUM)) !=
+      AMI_ERROR_NO_ERROR) {
+    cout << "em_pqueue constructor: failing to get main_memory_usage\n";
+    exit(1);
+  }  
+  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
+  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
+  cout.flush();
+  //assume max_nbuf=2 suffices; check after arity is computed
+  max_nbuf = 2;
+
+  //account for temporary memory usage (set up a preliminary arity)
+  buf_arity = mm_avail/(2 * sz_stream);
+  long mm_overhead = buf_arity*sizeof(merge_key<Key>) + 
+    max_nbuf * sizeof(em_buffer<T,Key>) +
+    2*sz_stream + max_nbuf*sz_stream;
+  
+  mm_overhead *= 8; //overestimate
+  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
+  if (mm_overhead > mm_avail) {
+    cout << "overhead bigger than available memory"
+	 << "increase -m and try again\n";
+    exit(1);
+  }
+  mm_avail -= mm_overhead;
+  
+  
+#ifdef SAVE_MEMORY
+  //assign M/2 to pq
+  pqsize = mm_avail/(2*sizeof(T));
+  //assign M/2 to buff_0
+  bufsize = mm_avail/(2*sizeof(T));
+#else 
+  //assign M/4 to pq
+  pqsize = mm_avail/(4*sizeof(T));
+  //assign M/4 to buff_0
+  bufsize = mm_avail/(4*sizeof(T));
+#endif
+  
+  cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
+  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl; 
+  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl; 
+ 
+  
+  //assign M/2 to AMI_STREAMS and compute arity
+  /* arity is mainly constrained by the size of an AMI_STREAM; the
+     rest of the memory must accomodate for arity * max_nbuf
+     *sizeof(AMI_STREAM); there are some temporary stuff like arity *
+     sizeof(long) (the deleted array), arity * sizeof(T) (the array of
+     keys for merging) and so on, but the main factor is the
+     AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
+     allocates 2 logical blocks) */
+#ifdef SAVE_MEMORY
+ buf_arity = mm_avail/(2 * sz_stream);
+#else
+ buf_arity = mm_avail/(2 * max_nbuf * sz_stream); 
+#endif
+
+  //overestimate usage
+  if (buf_arity > 3) {
+    buf_arity -= 3;
+  } else {
+    buf_arity = 1;
+  }
+
+  cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
+  
+  crt_buf = 0; 
+  
+  //initialize in memory pqueue
+  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
+  pq = new MinMaxHeap<T>(pqsize);
+  assert(pq);
+  
+  //initialize in memory buffer
+  MEMORY_LOG("em_pqueue: allocating buff_0\n");
+  buff_0 = new im_buffer<T>(bufsize);
+  assert(buff_0);
+  
+  //allocate ext memory buffers array
+  char str[200];
+  sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
+	  (long)max_nbuf);
+  MEMORY_LOG(str);
+  //allocate ext memory buffers array
+  buff = new em_buffer<T,Key>* [max_nbuf];
+  assert(buff);
+  for (unsigned short i=0; i<max_nbuf; i++) {
+    buff[i] = NULL;
+  }
+
+  //max nb of items the structure can accomodate (constrained by max_nbuf)
+  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
+  cout.flush(); 
+  
+  //check that structure can accomodate N elements
+  //  assert(N < buf_arity * (buf_arity + 1) * bufsize);
+  //assert(N < maxlen());  
+  mm_avail = getAvailableMemory();
+  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n", 
+	       mm_avail/(float)(1<<20));
+}
+
+
+#ifdef SAVE_MEMORY
+//************************************************************/
+// create an empq, initialize its pq with im and insert amis in
+// buff[0]; im should not be used/deleted after that outside empq;
+//
+// assumption: im was allocated such that maxsize = mm_avail/T;
+// when this constructor is called im is only half full, so we must 
+// free half of its space and  give to buff_0
+template<class T, class Key>
+em_pqueue<T,Key>::em_pqueue(MinMaxHeap<T> *im, AMI_STREAM<T> *amis) {
+  AMI_err ae;
+  int pqcapacity;	   /* amount of memory we can use for each of new
+						  minmaxheap, and em-buffer */ 
+  unsigned int pqcurrentsize;   /* number of elements currently in im */
+  assert(im && amis);
+
+  pqcapacity = im->get_maxsize()/2; // we think this memory is now available
+  pqcurrentsize = im->size();
+  pqsize = pqcapacity;
+
+  LOG_avail_memo();
+ 
+  /* at this point im is allocated all memory, but it is only at most
+     half full; we need to relocate im to half space and to allocate
+     buff_0 the other half; since we use new, there is no realloc, so
+     we will copy to a file...*/
+
+  {
+	//copy im to a stream and free its memory
+	T x;
+	AMI_STREAM<T> tmpstr;
+	for (unsigned int i=0; i<pqcurrentsize; i++) {
+	  im->extract_min(x);
+    ae = tmpstr.write_item(x);
+    assert(ae == AMI_ERROR_NO_ERROR);
+	}
+	delete im;
+	LOG_avail_memo();
+	
+	//allocate pq and buff_0 half size
+	bufsize = pqcapacity;
+	cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
+		 << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n"; 
+	cout.flush();
+	buff_0 = new im_buffer<T>(bufsize);
+	assert(buff_0);
+	cout << "EM_PQUEUE: allocating pq size=" << pqcapacity
+		 << " total " << (float)pqcapacity*sizeof(T)/(1<<20)  << "MB\n"; 
+	cout.flush();
+	pq = new MinMaxHeap<T>(pqcapacity);
+	assert(pq);
+	
+	//fill pq from tmp stream
+	ae =  tmpstr.seek(0);
+	assert(ae == AMI_ERROR_NO_ERROR);
+	T *elt;
+	for (unsigned int i=0; i<pqcurrentsize; i++) {
+	  ae = tmpstr.read_item(&elt);
+	  assert(ae == AMI_ERROR_NO_ERROR);
+	  pq->insert(*elt);
+	}
+	assert(pq->size() == pqcurrentsize);
+  }
+
+  //estimate buf_arity
+  //AMI_STREAM memory usage
+  size_t sz_stream;
+  AMI_STREAM<T> dummy;
+  if ((ae = dummy.main_memory_usage(&sz_stream,
+				    MM_STREAM_USAGE_MAXIMUM)) !=
+      AMI_ERROR_NO_ERROR) {
+    cout << "em_pqueue constructor: failing to get main_memory_usage\n";
+    exit(1);
+  }  
+  cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
+  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
+  //assume max_nbuf=2 suffices; check after arity is computed
+  max_nbuf = 2;
+  buf_arity = pqcapacity * sizeof(T) / sz_stream;
+  //should account for some overhead
+  if (buf_arity == 0) {
+    cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
+    exit(1);
+  }
+   if (buf_arity > 3) {
+     buf_arity -= 3;
+   } else {
+     buf_arity = 1;
+   }
+
+  //allocate ext memory buffer array
+  char str[200];
+  sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
+	  (long)max_nbuf);
+  MEMORY_LOG(str);
+  buff = new em_buffer<T,Key>* [max_nbuf];
+  assert(buff);
+  for (unsigned short i=0; i<max_nbuf; i++) {
+    buff[i] = NULL;
+  }    
+  crt_buf = 0;
+
+  cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
+  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl; 
+  cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
+  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl; 
+  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
+  cout.flush(); 
+
+  //estimate available remaining memory 
+  size_t mm_avail = getAvailableMemory();
+  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n", 
+	 mm_avail/(float)(1<<20));
+  
+  //last thing: insert the input stream in external buffers
+  //allocate buffer if necessary
+  //assert(crt_buf==0 && !buff[0]);// given
+  if(amis->stream_len()) {
+	//create buff[0] as a level1 buffer
+    MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
+    buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity); 
+	buff[0]->insert(amis);
+	crt_buf = 1;
+  }
+}
+
+#endif
+
+
+
+//************************************************************/
+//free space  
+template<class T, class Key>
+em_pqueue<T,Key>::~em_pqueue() {
+  //delete in memory pqueue
+  if (pq) delete pq;
+  //delete in memory buffer
+  if (buff_0) delete buff_0;
+  //delete ext memory buffers
+  for (unsigned short i=0; i< crt_buf; i++) {
+    if (buff[i]) delete buff[i];
+  }
+  delete [] buff;
+}
+
+
+//************************************************************/
+//return maximum capacity of i-th external buffer
+template<class T, class Key>
+long  em_pqueue<T,Key>::maxlen(unsigned short i) {
+  
+  if (i >= max_nbuf) {
+    printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n", 
+	   i, max_nbuf);
+    return 0;
+  }
+  if (i < crt_buf) {
+    return buff[i]->get_buf_maxlen();
+  }
+  //try allocating buffer
+  em_buffer<T,Key> * tmp = new em_buffer<T,Key>(i+1, bufsize, buf_arity);
+  if (!tmp) {
+    cout << "em_pqueue::max_len: cannot allocate\n";
+    return 0;
+  }
+  long len = tmp->get_buf_maxlen();
+  delete tmp;
+  return len;
+}
+
+
+//************************************************************/
+//return maximum capacity of em_pqueue
+template<class T, class Key>
+long em_pqueue<T,Key>::maxlen() {
+  long len = 0;
+  for (unsigned short i=0; i< max_nbuf; i++) {
+    len += maxlen(i);
+  }
+  return len + buff_0->get_buf_maxlen();
+}
+
+
+
+//************************************************************/
+//return the total nb of elements in the structure 
+template<class T, class Key>
+unsigned long em_pqueue<T,Key>::size() {
+  //sum up the lenghts(nb of elements) of the external buffers 
+  unsigned long elen = 0;
+  for (unsigned short i=0; i < crt_buf; i++) {
+    elen += buff[i]->get_buf_len();
+  }
+  return elen + pq->size() + buff_0->get_buf_len();
+}
+
+
+//************************************************************/
+//return true if empty
+template<class T, class Key>
+bool em_pqueue<T,Key>::is_empty() {
+  
+  //return (size() == 0);
+  //more efficient?
+  return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) && 
+	  (size() == 0));
+}
+
+
+//************************************************************/
+//called when pq must be filled from external buffers
+template<class T, class Key>
+bool em_pqueue<T,Key>::fillpq() {
+  
+#ifndef NDEBUG
+  {
+	int k=0;
+	for (unsigned short i=0; i<crt_buf; i++) {
+	  k |= buff[i]->get_buf_len();
+	}
+	if(!k) {
+	  cerr << "fillpq called with empty external buff!" << endl;
+	}
+	assert(k);
+  }
+#endif
+
+#ifdef EMPQ_PQ_FILL_PRINT
+  cout << "filling pq\n"; cout .flush();
+#endif
+  XXX cerr << "filling pq" << endl; 
+  MY_LOG_DEBUG_ID("fillpq");
+
+  AMI_err ae;
+  {
+	char str[200];
+	sprintf(str, "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
+			crt_buf);
+	MEMORY_LOG(str);
+  }
+  //merge pqsize smallest elements from each buffer into a new stream
+  ExtendedMergeStream** outstreams;
+  //gcc-3.4 doesn't allows (TYPE*)[SIZE] declarations
+  //use TYPE*[SIZE]
+  outstreams = new ExtendedMergeStream*[crt_buf];
+
+  for (unsigned short i=0; i< crt_buf; i++) {
+    MY_LOG_DEBUG_ID(crt_buf);
+	outstreams[i] = new ExtendedMergeStream();
+	assert(buff[i]->get_buf_len());
+    ae = merge_buffer(buff[i], outstreams[i], pqsize);
+    assert(ae == AMI_ERROR_NO_ERROR);
+	assert(outstreams[i]->stream_len());
+    //print_stream(outstreams[i]); cout.flush();
+  }
+  
+  if (crt_buf == 1) {
+    //just one level; make common case faster :)
+    merge_bufs2pq(outstreams[0]);
+	delete outstreams[0];
+	delete [] outstreams;
+  } else {
+    //merge the outstreams to get the global mins and delete them afterwards
+    ExtendedMergeStream *minstream = new ExtendedMergeStream();
+    //cout << "merging streams\n";
+    ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
+    assert(ae == AMI_ERROR_NO_ERROR);
+	for (unsigned short i=0; i< crt_buf; i++) {
+	  delete outstreams[i];
+	}
+    delete [] outstreams;
+    
+    //copy the minstream in the internal pqueue while merging with buff_0;
+    //the smallest <pqsize> elements between minstream and buff_0 will be
+    //inserted in internal pqueue;
+    //also, the elements from minstram which are inserted in pqueue must be
+    //marked as deleted in the source streams;
+    merge_bufs2pq(minstream);
+	delete minstream;
+    //cout << "after merge_bufs2pq: \n" << *this << "\n";
+  }
+  XXX assert(pq->size());
+  XXX cerr << "fillpq done" << endl;
+  return true;
+}
+
+
+
+//************************************************************/
+//return the element with minimum priority in the structure
+template<class T, class Key>
+bool 
+em_pqueue<T,Key>::min(T& elt){
+  
+  bool ok;
+
+  MY_LOG_DEBUG_ID("em_pqueue::min");
+
+  //try first the internal pqueue
+  if (!pq->empty()) {
+    ok = pq->min(elt);
+    assert(ok);
+    return ok;
+  }
+
+  MY_LOG_DEBUG_ID("extract_min: reset pq");
+  pq->reset();
+
+  //if external buffers are empty
+  if (crt_buf == 0) {
+    //if internal buffer is empty too, then nothing to extract
+    if (buff_0->is_empty()) {
+      //cerr << "min called on empty empq" << endl;
+      return false;
+    } else {
+#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
+      cout << "filling pq from B0\n"; cout.flush();
+#endif
+      //ext. buffs empty; fill int pqueue from buff_0 
+      long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
+      buff_0->reset(pqsize, n);
+      ok = pq->min(elt);
+      assert(ok); 
+      return true;
+    }
+  } else {
+    //external buffers are not empty;
+    //called when pq must be filled from external buffers
+	XXX print_size();
+    fillpq();
+    XXX cerr << "fillpq done; about to take min" << endl;
+    ok = pq->min(elt);
+    XXX cerr << "after taking min" << endl;
+    assert(ok);
+    return ok;
+  } //end of else (if ext buffers are not empty)
+  
+  assert(0); //not reached
+}
+
+
+
+//************************************************************/
+template<class T,class Key>
+static void print_ExtendedMergeStream(ExtendedMergeStream &str) {
+  
+  ExtendedEltMergeType<T,Key> *x;
+  str.seek(0);
+  while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
+    cout << *x << ", ";
+  }
+  cout << "\n";
+}
+
+
+//************************************************************/
+//delete the element with minimum priority in the structure;
+//return false if pq is empty
+template<class T, class Key>
+bool em_pqueue<T,Key>::extract_min(T& elt) {
+
+  bool ok;
+
+  MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
+  
+  //try first the internal pqueue
+  if (!pq->empty()) {
+    //cout << "extract from internal pq\n";
+    MY_LOG_DEBUG_ID("extract from internal pq");
+    ok = pq->extract_min(elt);
+    assert(ok);
+    return ok;
+  }
+
+  //if internal pqueue is empty, fill it up
+  MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
+  MY_LOG_DEBUG_ID("extract_min: reset pq");
+  pq->reset();
+
+  //if external buffers are empty
+  if (crt_buf == 0) {
+    //if internal buffer is empty too, then nothing to extract
+    if (buff_0->is_empty()) {
+      return false;
+    } else {
+#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
+      cout << "filling pq from B0\n"; cout.flush();
+#endif
+      MY_LOG_DEBUG_ID("filling pq from buff_0");
+      //ext. buffs empty; fill int pqueue from buff_0 
+      long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
+      buff_0->reset(pqsize, n);
+      ok = pq->extract_min(elt);
+      assert(ok); 
+      return true;
+    }
+  } else {
+    //external buffers are not empty;
+    //called when pq must be filled from external buffers
+    MY_LOG_DEBUG_ID("filling pq from buffers");
+#ifdef EMPQ_PRINT_SIZE
+    long x = size(), y;
+    y = x*sizeof(T) >> 20;
+    cout << "pqsize:[" << active_streams() << " streams: ";
+    print_stream_sizes();
+    cout << " total " << x << "(" << y << "MB)]" << endl;
+    cout.flush();
+#endif
+    fillpq();
+    MY_LOG_DEBUG_ID("pq filled");
+    XXX cerr << "about to get the min" << endl;
+    assert(pq);
+    ok = pq->extract_min(elt);
+    if (!ok) {
+      cout << "failing assertion: pq->extract_min == true\n";
+      this->print();
+      assert(ok);
+    }
+
+    return ok;
+  } //end of else (if ext buffers are not empty)
+  
+  assert(0); //not reached
+}
+
+
+
+//************************************************************/
+//extract all elts with min key, add them and return their sum
+//delete the element with minimum priority in the structure;
+//return false if pq is empty
+template<class T, class Ke