[GRASS-SVN] r32625 - in grass/trunk: include/iostream lib/iostream

svn_grass at osgeo.org svn_grass at osgeo.org
Thu Aug 7 17:13:14 EDT 2008


Author: pkelly
Date: 2008-08-07 17:13:13 -0400 (Thu, 07 Aug 2008)
New Revision: 32625

Modified:
   grass/trunk/include/iostream/ami.h
   grass/trunk/include/iostream/ami_config.h
   grass/trunk/include/iostream/ami_sort.h
   grass/trunk/include/iostream/ami_sort_impl.h
   grass/trunk/include/iostream/ami_stream.h
   grass/trunk/include/iostream/embuffer.h
   grass/trunk/include/iostream/empq.h
   grass/trunk/include/iostream/empq_adaptive.h
   grass/trunk/include/iostream/empq_adaptive_impl.h
   grass/trunk/include/iostream/empq_impl.h
   grass/trunk/include/iostream/imbuffer.h
   grass/trunk/include/iostream/mem_stream.h
   grass/trunk/include/iostream/minmaxheap.h
   grass/trunk/include/iostream/mm.h
   grass/trunk/include/iostream/mm_utils.h
   grass/trunk/include/iostream/pqheap.h
   grass/trunk/include/iostream/queue.h
   grass/trunk/include/iostream/quicksort.h
   grass/trunk/include/iostream/replacementHeap.h
   grass/trunk/include/iostream/replacementHeapBlock.h
   grass/trunk/include/iostream/rtimer.h
   grass/trunk/lib/iostream/ami_stream.cc
   grass/trunk/lib/iostream/mm.cc
   grass/trunk/lib/iostream/mm_utils.cc
   grass/trunk/lib/iostream/rtimer.cc
Log:
Updates to iostream library from Laura Toma.


Modified: grass/trunk/include/iostream/ami.h
===================================================================
--- grass/trunk/include/iostream/ami.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,7 +17,6 @@
  *
  *****************************************************************************/
 
-
 #ifndef _AMI_H
 #define _AMI_H
 

Modified: grass/trunk/include/iostream/ami_config.h
===================================================================
--- grass/trunk/include/iostream/ami_config.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_config.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   

Modified: grass/trunk/include/iostream/ami_sort.h
===================================================================
--- grass/trunk/include/iostream/ami_sort.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_sort.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -76,8 +76,9 @@
 template<class T, class Compare>
 AMI_err 
 AMI_sort(AMI_STREAM<T> *instream, AMI_STREAM<T> **outstream, Compare *cmp, 
-	 int deleteInputStream = 0) {
-  char* name;
+	 int deleteInputStream = 0)
+{
+  char* name=NULL;
   queue<char*>* runList;
   int instreamLength;
 
@@ -102,18 +103,30 @@
   
   //run formation
   runList = runFormation(instream, cmp);
-  assert(runList && runList->length() > 0); 
+  assert(runList); 
 
   if (deleteInputStream) {
     delete instream;
   }
 
-  if (runList->length() == 1) {
+  if(runList->length() == 0) {
+    /* self-check */
+    fprintf(stderr, "ami_sort: Error - no runs created!\n");
+    instream->name(&name);
+    cout << "ami_sort: instream = " << name << endl;
+    exit(1);
+    /* no input... */
+    /* *outstream = new AMI_STREAM<T>(); */
+
+  } else if(runList->length() == 1) {    
     //if 1 run only
     runList->dequeue(&name);
+    //printf("SORT: %s\n", name); fflush(stdout); 
     *outstream = new AMI_STREAM<T>(name);
     delete name; //should be safe, stream makes its own copy
-  } else {
+    
+  } else {						
+    /* many runs */
     *outstream = multiMerge<T,Compare>(runList,  cmp);
     //i thought the templates are not needed in the call, but seems to
     //help the compiler..laura

Modified: grass/trunk/include/iostream/ami_sort_impl.h
===================================================================
--- grass/trunk/include/iostream/ami_sort_impl.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_sort_impl.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -36,7 +37,6 @@
 #define BLOCKED_RUN 
 
 
-
 /* ---------------------------------------------------------------------- */
 //set run_size, last_run_size and nb_runs depending on how much memory
 //is available
@@ -45,16 +45,18 @@
 initializeRunFormation(AMI_STREAM<T> *instream,
 		       size_t &run_size, size_t &last_run_size, 
 		       unsigned int &nb_runs) {
-  
-  off_t strlen = instream->stream_len();  
+
   size_t mm_avail = MM_manager.memory_available();
+  off_t strlen;
+
 #ifdef BLOCKED_RUN
   // not in place, can only use half memory 
   mm_avail = mm_avail/2;
 #endif
-  
   run_size = mm_avail/sizeof(T);
 
+  
+  strlen = instream->stream_len();  
   if (strlen == 0) {
     nb_runs = last_run_size = 0;
   } else {
@@ -66,6 +68,7 @@
       last_run_size = strlen % run_size;
     }
   }
+
   SDEBUG cout << "nb_runs=" << nb_runs 
 	      << ", run_size=" << run_size 
 	      << ", last_run_size=" << last_run_size
@@ -78,17 +81,19 @@
 /* data is allocated; read run_size elements from stream into data and
    sort them using quicksort */
 template<class T, class Compare>
-void makeRun_Block(AMI_STREAM<T> *instream, T* data, 
+size_t makeRun_Block(AMI_STREAM<T> *instream, T* data, 
 		   unsigned int run_size, Compare *cmp) {
   AMI_err err;
-  
+  off_t new_run_size;
+
   //read next run from input stream
-  err = instream->read_array(data, run_size); 
-  assert(err == AMI_ERROR_NO_ERROR);
-  
+  err = instream->read_array(data, run_size, &new_run_size); 
+  assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
+
   //sort it in memory in place
-  quicksort(data, run_size, *cmp);
+  quicksort(data, new_run_size, *cmp);
   
+  return new_run_size;
 }
 
 
@@ -105,6 +110,7 @@
   
   unsigned int nblocks, last_block_size, crt_block_size, block_size; 
 
+
   block_size = STREAM_BUFFER_SIZE;
 
   if (run_size % block_size == 0) {
@@ -178,16 +184,18 @@
   SDEBUG cout << "runFormation: ";
   SDEBUG MM_manager.print();
   
+  /* leave this in for now, in case some file-based implementations do
+     anything funny... -RW */
   //rewind file
   instream->seek(0); //should check error xxx
-  
+
   //estimate run_size, last_run_size and nb_runs
   initializeRunFormation(instream, run_size, last_run_size, nb_runs);
-  
-  //create runList
+
+  //create runList (if 0 size, queue uses default)
   runList = new queue<char*>(nb_runs);
-  
-  //allocate space for a run
+
+  /* allocate space for a run */
   if (nb_runs <= 1) {
     //don't waste space if input stream is smaller than run_size
     data = new T[last_run_size];
@@ -195,18 +203,20 @@
     data = new T[run_size];
   }
   SDEBUG MM_manager.print();
-  
-  for (size_t i=0; i< nb_runs; i++) {
+ 
+  //for (size_t i=0; i< nb_runs; i++) {
+  while(!instream->eof()) {
+    //crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
     
-    crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
-    
     //SDEBUG cout << "i=" << i << ":  runsize=" << crt_run_size << ", ";  
 
-#ifdef BLOCKED_RUN
-    makeRun(instream, data, crt_run_size, cmp);
-#else        
-    makeRun_Block(instream, data, crt_run_size, cmp);
-#endif
+    crt_run_size = makeRun_Block(instream, data, run_size, cmp);
+/* #ifdef BLOCKED_RUN */
+/*     makeRun(instream, data, crt_run_size, cmp); */
+/* #else         */
+/*     makeRun_Block(instream, data, crt_run_size, cmp); */
+/* #endif */
+
     SDEBUG MM_manager.print();
 
     //read next run from input stream
@@ -214,19 +224,21 @@
     //assert(err == AMI_ERROR_NO_ERROR);
     //sort it in memory in place
     //quicksort(data, crt_run_size, *cmp);
+
+    if(crt_run_size > 0) {
+      //create a new stream to hold this run 
+      str = new AMI_STREAM<T>();
+      str->write_array(data, crt_run_size);
+      assert(str->stream_len() == crt_run_size);
     
-    //create a new stream to hold this run 
-    str = new AMI_STREAM<T>();
-    str->write_array(data, crt_run_size);
-    assert(str->stream_len() == crt_run_size);
-    
-    //remember this run's name
-    str->name(&strname);
-    runList->enqueue(strname);
-    
-    //delete the stream -- should not keep too many streams open
-    str->persist(PERSIST_PERSISTENT);
-    delete str;
+      //remember this run's name
+      str->name(&strname);	/* deleted after we dequeue */
+      runList->enqueue(strname);
+      //delete the stream -- should not keep too many streams open
+      str->persist(PERSIST_PERSISTENT);
+      delete str;
+    }
+
   };
   SDEBUG MM_manager.print();
   //release the run memory!
@@ -263,7 +275,8 @@
 
 template<class T, class Compare>
 AMI_STREAM<T>* 
-singleMerge(queue<char*>* streamList, Compare *cmp) {
+singleMerge(queue<char*>* streamList, Compare *cmp)
+{
   AMI_STREAM<T>* mergedStr;
   size_t mm_avail, blocksize;
   unsigned int arity, max_arity; 
@@ -275,18 +288,26 @@
 
   //estimate max possible merge arity with available memory (approx M/B)
   mm_avail = MM_manager.memory_available();
-  blocksize = getpagesize();
+  //blocksize = getpagesize();
   //should use AMI function, but there's no stream at this point
-  // AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_BUFFER);
-  max_arity = mm_avail/blocksize;
+  //now use static mtd -RW 5/05
+  AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_MAXIMUM);
+  max_arity = mm_avail / blocksize;
+  if(max_arity < 2) {
+	cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
+	max_arity = 2;
+  } else if(max_arity > MAX_STREAMS_OPEN) {
+	max_arity = MAX_STREAMS_OPEN;
+  }
   arity = (streamList->length() < max_arity) ? 
     streamList->length() :  max_arity;
 
   SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n"; 
-  
+
+  /* create the output stream. if this is a complete merge, use finalpath */
   //create output stream
   mergedStr = new AMI_STREAM<T>();
-  
+
   ReplacementHeap<T,Compare> rheap(arity, streamList);
   SDEBUG rheap.print(cerr);
 
@@ -325,7 +346,8 @@
 
 template<class T, class Compare>
 AMI_STREAM<T>* 
-multiMerge(queue<char*>* runList, Compare *cmp) {
+multiMerge(queue<char*>* runList, Compare *cmp)
+{
   AMI_STREAM<T> * mergedStr= NULL;
   char* path;
   

Modified: grass/trunk/include/iostream/ami_stream.h
===================================================================
--- grass/trunk/include/iostream/ami_stream.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_stream.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -31,15 +31,12 @@
 #include <unistd.h>
 
 #include <iostream>
-#include <cstring>
 using namespace std;
 
+#define MAX_STREAMS_OPEN 200
+
 #include "mm.h" // Get the memory manager.
 
-#ifdef __MINGW32__
-#define getpagesize() (4096)
-#endif
-
 #define DEBUG_DELETE if(0)
 
 // The name of the environment variable which keeps the name of the
@@ -49,7 +46,7 @@
 // All streams will be names STREAM_*****
 #define BASE_NAME "STREAM"
 
-#define STREAM_BUFFER_SIZE (1<<15)
+#define STREAM_BUFFER_SIZE (1<<18)
 
 
 //
@@ -71,6 +68,7 @@
   AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
 };
 
+extern char *ami_str_error[];
 
 //
 // AMI stream types passed to constructors
@@ -79,7 +77,8 @@
     AMI_READ_STREAM = 1,	// Open existing stream for reading
     AMI_WRITE_STREAM,		// Open for writing.  Create if non-existent
     AMI_APPEND_STREAM,		// Open for writing at end.  Create if needed.
-    AMI_READ_WRITE_STREAM	// Open to read and write.
+    AMI_READ_WRITE_STREAM,	// Open to read and write.
+	AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
 };
 
 
@@ -94,13 +93,11 @@
   PERSIST_READ_ONCE
 };
 
-
-
-template<class T> 
-class AMI_STREAM {
-private:
+/* an un-templated version makes for easier debugging */
+class UntypedStream {
+protected:
   FILE * fp;
-  //int fd;	//descriptor of file
+  int fildes;	//descriptor of file
   AMI_stream_type  access_mode;
   char path[BUFSIZ];
   persistence per;
@@ -117,35 +114,45 @@
 
   //stream buffer passed in the call to setvbuf when file is opened
   char* buf;
+  int eof_reached;
 
+ public:
+  static unsigned int get_block_length()  {
+    return STREAM_BUFFER_SIZE;
+    //return getpagesize();
+  };
+
+};
+
+template<class T> 
+class AMI_STREAM : public UntypedStream {
+
 protected:
-  unsigned int get_block_length();
-  
+
+  T read_tmp;					/* this is ugly... RW */
+
 public:
-   T read_tmp;
-
   // An AMI_stream with default name
   AMI_STREAM();
   
   // An AMI stream based on a specific path name.
-  AMI_STREAM(const char *path_name, 
-	     //	     AMI_stream_type st = AMI_READ_WRITE_STREAM);
-	     AMI_stream_type st);
+  AMI_STREAM(const char *path_name, AMI_stream_type st);
+
+  // convenience function with split path_name
+  //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
   
+  
   // A psuedo-constructor for substreams.
   AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
 			AMI_STREAM<T> **sub_stream);
-  
+
   // Destructor
   ~AMI_STREAM(void);
   
   // Read and write elements.
   AMI_err read_item(T **elt);
-
   AMI_err write_item(const T &elt);
-  
-  AMI_err read_array(T *data, off_t len);
-  
+  AMI_err read_array(T *data, off_t len, off_t *lenp);
   AMI_err write_array(const T *data, off_t len);
   
   // Return the number of items in the stream.
@@ -153,32 +160,34 @@
   
   // Return the path name of this stream.
   AMI_err name(char **stream_name);
+  const char* name() const;
   
   // Move to a specific item in the stream.
   AMI_err seek(off_t offset);
-  
+
   // Query memory usage
-  AMI_err main_memory_usage(size_t *usage,
+  static AMI_err main_memory_usage(size_t *usage,
 			    //MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
 			    MM_stream_usage usage_type);
   
   void persist(persistence p);
   
   char *sprint();
+
+  // have we hit the end of the stream
+  int eof();
 };
 
 
 /**********************************************************************/
-template<class T>
-unsigned int AMI_STREAM<T>::get_block_length() {
-  return getpagesize();
-}
 
 
 /**********************************************************************/
 /* creates a random file name, opens the file for reading and writing
    and and returns a file descriptor */
-int ami_single_temp_name(const std::string& base, char* tmp_path);
+/* int ami_single_temp_name(char *base, char* tmp_path); */
+/* fix from Andy Danner */
+int ami_single_temp_name(const std::string& base, char* tmp_path); 
 
 
 /**********************************************************************/
@@ -201,8 +210,8 @@
   
   access_mode = AMI_READ_WRITE_STREAM;
   int fd = ami_single_temp_name(BASE_NAME, path);
+  fildes = fd;
   fp = open_stream(fd, access_mode);
-
   
   /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
   buf = new char[STREAM_BUFFER_SIZE];
@@ -218,8 +227,11 @@
   substream_level  = 0;
   logical_bos = logical_eos = -1;
 
+  // why is this here in the first place?? -RW
   seek(0);
 
+  eof_reached = 0;
+
   // Register memory usage before returning.
   //size_t usage; 
   //main_memory_usage(&usage,  MM_STREAM_USAGE_CURRENT);
@@ -233,12 +245,19 @@
 template<class T>
 AMI_STREAM<T>::AMI_STREAM(const char *path_name,
 			  AMI_stream_type st = AMI_READ_WRITE_STREAM) {
-  
+
   access_mode = st;
-  strcpy(path, path_name);
-  fp = open_stream(path, st);
-  
 
+  if(path_name == NULL) {
+	int fd = ami_single_temp_name(BASE_NAME, path);
+	fildes = fd;
+	fp = open_stream(fd, access_mode);
+  } else {
+	strcpy(path, path_name);
+	fp = open_stream(path, st);  
+	fildes = -1;
+  }
+
   /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
   buf = new char[STREAM_BUFFER_SIZE];
   if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
@@ -246,9 +265,15 @@
     exit(1);
   }
 
+  eof_reached = 0;
+
   // By default, all streams are deleted at destruction time.
-  per = PERSIST_DELETE;
-  
+  if(st == AMI_READ_STREAM) {
+	per = PERSIST_PERSISTENT;
+  } else {
+	per = PERSIST_DELETE;
+  }
+
   // Not a substream.
   substream_level  = 0;
   logical_bos = logical_eos = -1;
@@ -303,12 +328,13 @@
   
   // Set the current position.
   substr->seek(0);
-  
+
+  substr->eof_reached = 0;
+
   //set substream level
   substr->substream_level = substream_level + 1;
 
-  //set persistence
-  substr->per = per;
+  substr->per = per;   //set persistence
 
   //*sub_stream = (AMI_base_stream < T > *)substr;
   *sub_stream = substr;
@@ -327,6 +353,7 @@
   struct stat buf;
   if (stat(path, &buf) == -1) {
     perror("AMI_STREAM::stream_len(): fstat failed ");
+	perror(path);
     assert(0);
     exit(1);
   }
@@ -349,8 +376,15 @@
   return AMI_ERROR_NO_ERROR;
 };
 
+// Return the path name of this stream.
+template<class T>
+const char *
+AMI_STREAM<T>::name() const {
+  return path;
+};
 
 
+
 /**********************************************************************/
 // Move to a specific offset within the (sub)stream.
 template<class T>
@@ -358,8 +392,7 @@
 
   off_t seek_offset;
   
-  if (substream_level) {
-    //substream
+  if (substream_level) {    //substream
     if (offset  > (unsigned) (logical_eos - logical_bos)) {
       //offset out of range
       cerr << "AMI_STREAM::seek bos=" << logical_bos << ", eos=" << logical_eos
@@ -377,8 +410,8 @@
     seek_offset = offset * sizeof(T);
   }
 
-  if (fseek(fp, seek_offset, SEEK_SET) == -1) {
-    cerr << "AMI_STREAM::seek offset=" << seek_offset << "failed.\n";
+  if (fseeko(fp, seek_offset, SEEK_SET) == -1) {
+    cerr << "AMI_STREAM::seek offset=" << seek_offset << " failed.\n";
     assert(0);
     exit(1);
   }
@@ -392,12 +425,13 @@
 /**********************************************************************/
 // Query memory usage
 template<class T>
-AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
-					 MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
+AMI_err
+AMI_STREAM<T>::main_memory_usage(size_t *usage,
+								 MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
   
    switch (usage_type) {
    case MM_STREAM_USAGE_OVERHEAD:
-     *usage = sizeof (*this);
+     *usage = sizeof (AMI_STREAM<T>);
      break;
    case MM_STREAM_USAGE_BUFFER:
      // *usage = get_block_length();
@@ -406,7 +440,7 @@
    case MM_STREAM_USAGE_CURRENT:
    case MM_STREAM_USAGE_MAXIMUM:
      // *usage = sizeof (*this) + get_block_length();
-     *usage = sizeof (*this) + STREAM_BUFFER_SIZE*sizeof(char);
+     *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
      break;
    }
    return AMI_ERROR_NO_ERROR;
@@ -417,15 +451,15 @@
 /**********************************************************************/
 template<class T>
 AMI_STREAM<T>::~AMI_STREAM(void)  {
-
+  
   DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
-  delete buf;
   assert(fp);
   fclose(fp);
+  delete buf;
   
   // Get rid of the file if not persistent and if not substream.
   if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
-    if (remove(path) == -1) {
+    if (unlink(path) == -1) {
       cerr << "AMI_STREAM: failed to unlink " << path << endl;
       perror("cannot unlink ");
       assert(0);
@@ -445,16 +479,21 @@
 AMI_err AMI_STREAM<T>::read_item(T **elt)  {
 
   assert(fp);
+
   //if we go past substream range
   if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
     return AMI_ERROR_END_OF_STREAM;
   
   } else {
     if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
-      //cerr << "file=" << path << ":";
-      //perror("cannot read!");
-      //assume EOF --should fix this XXX
-      return AMI_ERROR_END_OF_STREAM;
+      if(feof(fp)) {
+	eof_reached = 1;
+	return AMI_ERROR_END_OF_STREAM;
+      } else {
+	cerr << "file=" << path << ":";
+	perror("cannot read!");    
+	return AMI_ERROR_IO_ERROR;
+      }
     }
     
     *elt = &read_tmp;
@@ -467,21 +506,30 @@
 
 /**********************************************************************/
 template<class T>
-AMI_err AMI_STREAM<T>::read_array(T *data, off_t len) {
-  
+AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp=NULL) {
+  size_t nobj;
   assert(fp);
   
   //if we go past substream range
   if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+	eof_reached = 1;
     return AMI_ERROR_END_OF_STREAM;
     
   } else {
-	if (fread((void*)data, sizeof(T), len, fp) < len) {
-	  cerr << "file=" << path << ":";
-	  perror("cannot read!");    
-	  //assume EOF --should fix this XXX
-	  return AMI_ERROR_END_OF_STREAM;
-	}
+    nobj = fread((void*)data, sizeof(T), len, fp);
+
+    if (nobj < len) {		/* some kind of error */
+      if(feof(fp)) {
+	if(lenp) *lenp = nobj;
+	eof_reached = 1;
+	return AMI_ERROR_END_OF_STREAM;
+      } else {
+	cerr << "file=" << path << ":";
+	perror("cannot read!");    
+	return AMI_ERROR_IO_ERROR;
+      }
+    }
+    if(lenp) *lenp = nobj;
     return AMI_ERROR_NO_ERROR; 
   }
 };
@@ -501,9 +549,11 @@
   } else {
     if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
       cerr << "AMI_STREAM::write_item failed.\n";
+      perror(path);
       assert(0);
       exit(1);
     }
+
     return AMI_ERROR_NO_ERROR;
   }
 };
@@ -512,6 +562,7 @@
 /**********************************************************************/
 template<class T>
 AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
+  size_t nobj;
 
   assert(fp);
   //if we go past substream range
@@ -519,12 +570,13 @@
     return AMI_ERROR_END_OF_STREAM;
     
   } else {
-    if (fwrite(data, sizeof(T), len,fp) < len) {
+    nobj = fwrite(data, sizeof(T), len, fp);
+    if (nobj  < len) {
       cerr << "AMI_STREAM::write_item failed.\n";
       assert(0);
       exit(1);
     }
-    return AMI_ERROR_NO_ERROR;
+   return AMI_ERROR_NO_ERROR;
   }
 };
         
@@ -532,7 +584,7 @@
 /**********************************************************************/
 template<class T>
 void AMI_STREAM<T>::persist(persistence p)  {
-   per = p;
+  per = p;
 };
 
 
@@ -551,4 +603,11 @@
   return buf;
 };
 
+/**********************************************************************/
+template<class T>
+int AMI_STREAM<T>::eof()  {
+  return eof_reached;
+};
+
+
 #endif // _AMI_STREAM_H 

Modified: grass/trunk/include/iostream/embuffer.h
===================================================================
--- grass/trunk/include/iostream/embuffer.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/embuffer.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -21,9 +22,9 @@
 
 
 #include <stdio.h>
-#include <math.h>
 #include <assert.h>
 #include <stdlib.h>
+#include <math.h>
 
 #include "ami_config.h" //for SAVE_MEMORY
 #include "ami_stream.h"
@@ -293,7 +294,7 @@
   unsigned long get_stream_maxlen() const {
     return (unsigned long)pow((double)arity,(double)level-1)*basesize;
   }
-
+  
   //return the actual size of stream i; i must be the index of a valid
   //stream
   unsigned long get_stream_len(unsigned int i) {
@@ -439,9 +440,7 @@
 	  arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
   MEMORY_LOG(str);
   //allocate STREAM* array
-  //GCC-3.4 does not allow (TYPE)[array] 
-  //use TYPE[array]
-  data = new AMI_STREAM<T>*[arity];
+  data = new AMI_STREAM<T>* [arity];
    
   //allocate deleted array
   sprintf(str, "em_buffer: allocate deleted array: %ld\n",
@@ -460,10 +459,7 @@
   sprintf(str, "em_buffer: allocate name array: %ld\n",
 		  (long)(arity*sizeof(char*)));
   MEMORY_LOG(str);
-  //GCC-3.4 does not allow (TYPE)[array] 
-  //use TYPE[array]
-  //name = new (char*)[arity];
-  name = new char*[arity];
+  name = new char* [arity];
   assert(name);
 #endif
 
@@ -805,6 +801,7 @@
       
 #ifdef SAVE_MEMORY
       //stream is empty ==> delete its name 
+	  assert(name[i]);
       delete name[i];
       name[i] = NULL;
 #endif
@@ -831,27 +828,28 @@
     for (i=0; i<index; i++) {
       //if i'th stream is not empty, shift it left if necessary
       if (data[i]) {
-	if (i!=j) {
-	  //set j'th stream to point to i'th stream
-	  //cout << j << " set to " << i << endl; cout.flush();
-	  data[j] = data[i];
-	  deleted[j] = deleted[i];
-	  streamsize[j] = streamsize[i];
-	  //set i'th stream to point to NULL
-	  data[i] = NULL;
-	  deleted[i] = 0;
-	  streamsize[i] = 0;
+		if (i!=j) {
+		  //set j'th stream to point to i'th stream
+		  //cout << j << " set to " << i << endl; cout.flush();
+		  data[j] = data[i];
+		  deleted[j] = deleted[i];
+		  streamsize[j] = streamsize[i];
+		  //set i'th stream to point to NULL
+		  data[i] = NULL;
+		  deleted[i] = 0;
+		  streamsize[i] = 0;
 #ifdef SAVE_MEMORY
-	  //fix the names
-	  delete name[j];
-	  name[j] = name[i];
-	  name[i] = NULL;
-	  check_name(j);
+		  //fix the names
+/* 	already done	  assert(name[j]); */
+/* 		  delete name[j]; */
+		  name[j] = name[i];
+		  name[i] = NULL;
+		  check_name(j);
 #endif
-	}  else {
-	  //cout << i << " left the same" << endl;
-	}
-	j++;
+		}  else {
+		  //cout << i << " left the same" << endl;
+		}
+		j++;
       } //if data[i] != NULL
     }//for i
 
@@ -896,6 +894,7 @@
     assert(streamsize[i] == data[i]->stream_len()); 
 #ifdef SAVE_MEMORY   
     check_name(i);
+	assert(name[i]);
     delete name[i];
     name[i] = NULL;
 #endif
@@ -918,12 +917,13 @@
 //all streams of the buffer in sorted ascending order of 
 //their keys  (priorities); 
 template<class T, class Key>
-AMI_STREAM<T>* em_buffer<T,Key>::sort() {
+AMI_STREAM<T>* 
+em_buffer<T,Key>::sort() {
   
   //create stream
   MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
 
-  AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>();
+  AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>();  /* will be deleteed in insert() */
   assert(sorted_stream);
   
   //merge the streams into sorted stream

Modified: grass/trunk/include/iostream/empq.h
===================================================================
--- grass/trunk/include/iostream/empq.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,6 +16,7 @@
  *
  *****************************************************************************/
 
+
 #ifndef __EMPQ_H
 #define __EMPQ_H
 
@@ -202,6 +203,9 @@
   //return maximum capacity of em_pqueue
   long maxlen();
 
+  // delete all the data in the pq; reset to empty but don't free memory
+  void clear();
+
   //print structure
   void print_range();
 

Modified: grass/trunk/include/iostream/empq_adaptive.h
===================================================================
--- grass/trunk/include/iostream/empq_adaptive.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_adaptive.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -27,7 +28,7 @@
 
 
 
-#define EMPQAD_DEBUG if(0)
+#define EMPQAD_DEBUG if(1)
 
 
 enum regim_type {
@@ -45,11 +46,13 @@
   MinMaxHeap<T> *im;
   em_pqueue<T,Key> *em;
   UnboundedMinMaxHeap<T> *dim;	// debug, internal memory pq
+  void initPQ(size_t);
 public:
   /* start in INMEM regim by allocating im of size precisely twice the
      size of the (pqueue within) the em_pqueue; */
   EMPQueueAdaptive(long N) : EMPQueueAdaptive() {};
   EMPQueueAdaptive();
+  EMPQueueAdaptive(size_t inMem);
   ~EMPQueueAdaptive();
 
   void makeExternal();
@@ -74,6 +77,8 @@
 
   long size() const; //return the nb of elements in the structure
 
+  void clear();			/* delete all contents of pq */
+
   void verify();
 };
 

Modified: grass/trunk/include/iostream/empq_adaptive_impl.h
===================================================================
--- grass/trunk/include/iostream/empq_adaptive_impl.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_adaptive_impl.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,6 +16,7 @@
  *
  *****************************************************************************/
 
+
 #ifndef __EMPQ_ADAPTIVE_IMPL_H
 #define __EMPQ_ADAPTIVE_IMPL_H
 
@@ -28,10 +29,10 @@
 #include "mm_utils.h"
 #include "empq_adaptive.h"
 
+#include "ami_sort.h"
 
 
-//defined in "empqAdaptive.H"
-//#define EMPQAD_DEBUG if(0)
+// EMPQAD_DEBUG defined in "empqAdaptive.H"
 
 
 
@@ -39,8 +40,30 @@
 //------------------------------------------------------------
 //allocate an internal pqueue of size precisely twice 
 //the size of the pqueue within the em_pqueue;
+//
+//This constructor uses a user defined amount of memory
 
 template<class T, class Key> 
+EMPQueueAdaptive<T,Key>::EMPQueueAdaptive(size_t inMem) {
+  regim = INMEM;
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" 
+		    << endl;
+  
+  //------------------------------------------------------------
+  //set the size precisely as in empq constructor since we cannot 
+  //really call the em__pqueue constructor, because we don't want 
+  //the space allocated; we just want the sizes;
+  //AMI_err ae;
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: " 
+		    << ( (float)inMem/ (1<< 20)) << "MB" << endl;
+  
+  initPQ(inMem);
+};
+
+
+//------------------------------------------------------------
+// This more resembles the original constuctor which is greedy
+template<class T, class Key> 
 EMPQueueAdaptive<T,Key>::EMPQueueAdaptive() {
   regim = INMEM;
   EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" 
@@ -50,11 +73,26 @@
   //set the size precisely as in empq constructor since we cannot 
   //really call the em__pqueue constructor, because we don't want 
   //the space allocated; we just want the sizes;
-  AMI_err ae;
   size_t mm_avail = getAvailableMemory();
   EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: " 
 		    << ( (float)mm_avail/ (1<< 20)) << "MB" << endl;
   
+
+  initPQ(mm_avail);
+
+};
+
+
+//------------------------------------------------------------
+// This metod initialized the PQ based on the memory passed 
+// into it
+template<class T, class Key>
+void
+EMPQueueAdaptive<T,Key>::initPQ(size_t initMem) {
+  AMI_err ae;
+  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: " 
+		    << ( (float)initMem/ (1<< 20)) << "MB" << endl;
+  
   /* same calculations as empq constructor in order to estimate
      overhead memory; this is because we want to allocate a pqueue of
      size exactly double the size of the pqueue inside the empq;
@@ -70,26 +108,35 @@
     cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
     exit(1);
   }  
+
+
   //account for temporary memory usage
   unsigned short max_nbuf = 2;
-  unsigned int buf_arity = mm_avail/(2 * sz_stream);
+  unsigned int buf_arity = initMem/(2 * sz_stream);
+  if (buf_arity > MAX_STREAMS_OPEN) buf_arity = MAX_STREAMS_OPEN; 
   unsigned long mm_overhead = buf_arity*sizeof(merge_key<Key>) + 
     max_nbuf * sizeof(em_buffer<T,Key>) +
     2*sz_stream + max_nbuf*sz_stream;
   mm_overhead *= 8; //overestimate..this should be fixed with
   //a precise accounting of the extra memory required
+
+  EMPQAD_DEBUG cout << "sz_stream: " << sz_stream << " buf_arity: " << buf_arity <<
+    " mm_overhead: " << mm_overhead << " mm_avail: " << initMem << ".\n";
+
+
+
   EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to " 
 		    << ((float)mm_overhead / (1 << 20)) << "MB" << endl;
-  if (mm_overhead > mm_avail) {
-    cerr << "overhead bigger than available memory ("<< mm_avail << "); "
+  if (mm_overhead > initMem) {
+    cerr << "overhead bigger than available memory ("<< initMem << "); "
 	 << "increase -m and try again\n";
     exit(1);
   }
-  mm_avail -= mm_overhead;
+  initMem -= mm_overhead;
   //------------------------------------------------------------
 
 
-  long pqsize = mm_avail/sizeof(T);
+  long pqsize = initMem/sizeof(T);
   EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
 
   //initialize in memory pqueue and set em to NULL
@@ -99,8 +146,6 @@
 };
 
 
-
-
 template<class T, class Key> 
 EMPQueueAdaptive<T,Key>::~EMPQueueAdaptive() {
   switch(regim) {
@@ -123,7 +168,7 @@
 template<class T, class Key> 
 long 
 EMPQueueAdaptive<T,Key>::maxlen() const {
-  long m;
+  long m=-1;
   switch(regim) {
   case INMEM:
 	assert(im);
@@ -213,8 +258,26 @@
   return v;
 };
 
+/* switch over to using an external priority queue */
 template<class T, class Key> 
 void
+EMPQueueAdaptive<T,Key>::clear() {
+  switch(regim) {
+  case INMEM:
+    im->clear();
+    break;
+  case EXTMEM:
+    em->clear();
+    break;
+  case EXTMEM_DEBUG:
+    dim->clear();
+    break;
+  }
+}
+
+
+template<class T, class Key> 
+void
 EMPQueueAdaptive<T,Key>::verify() {
   switch(regim) {
   case INMEM:
@@ -246,7 +309,7 @@
   case EXTMEM_DEBUG:
 	v1 =  dim->extract_all_min(tmp);
 	v = em->extract_all_min(elt);
-	assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	assert(dim->size() == em->size());
 	assert(v == v1);
 	assert(tmp == elt);
 	break;
@@ -269,7 +332,7 @@
 	v = em->size();
 	break;
   case EXTMEM_DEBUG:
-	v1 = dim->BasicMinMaxHeap<T>::size();
+	v1 = dim->size();
 	v = em->size();
 	assert(v == v1);
 	break;
@@ -300,7 +363,7 @@
 	  v = em->extract_min(elt);
 	  assert(v == v1);
 	  assert(tmp == elt);
-	  assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	  assert(dim->size() == em->size());
 	  break;
     }
 	return v;
@@ -334,7 +397,7 @@
   case EXTMEM_DEBUG:
 	dim->insert(elt);
 	v = em->insert(elt);
-	assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+	assert(dim->size() == em->size());
 	break;
   }
   return v;

Modified: grass/trunk/include/iostream/empq_impl.h
===================================================================
--- grass/trunk/include/iostream/empq_impl.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_impl.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -19,20 +20,11 @@
 #ifndef __EMPQ_IMPL_H
 #define __EMPQ_IMPL_H
 
-#include <stdio.h>
-
-#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
 #include <ostream>
-#else
-#include <ostream.h>
-#endif
-
 using namespace std;
 
 #include "empq.h"
 
-
-
 #if(0)
 #include "option.H"
 #define MY_LOG_DEBUG_ID(x) \
@@ -136,10 +128,12 @@
   //ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
   AMI_err ae;
   size_t mm_avail = getAvailableMemory();
-  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n", 
-	       mm_avail/(float)(1<<20));
-  printf("EM_PQUEUE:available memory before allocation: %ldB\n", 
-	       mm_avail);
+  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
+ 	       mm_avail/(float)(1<<20));
+  printf("EM_PQUEUE:available memory before allocation: %ldB\n",
+		 mm_avail);
+
+
   //____________________________________________________________
   //ALLOCATE STRUCTURE
    //some dummy checks..
@@ -357,51 +351,59 @@
   assert(im && amis);
 
   pqcapacity = im->get_maxsize()/2; // we think this memory is now available
+  pqsize = pqcapacity + 1; //truncate errors  
   pqcurrentsize = im->size();
-  pqsize = pqcapacity;
-
+  //assert( pqcurrentsize <= pqsize); 
+  if(!(pqcurrentsize <= pqsize)) {
+    cout << "EMPQ: pq maxsize=" << pqsize <<", pq crtsize=" << pqcurrentsize
+	 << "\n";
+    assert(0);
+    exit(1);
+  }
+  
+  
   LOG_avail_memo();
- 
+  
   /* at this point im is allocated all memory, but it is only at most
      half full; we need to relocate im to half space and to allocate
      buff_0 the other half; since we use new, there is no realloc, so
      we will copy to a file...*/
-
+  
   {
-	//copy im to a stream and free its memory
-	T x;
-	AMI_STREAM<T> tmpstr;
-	for (unsigned int i=0; i<pqcurrentsize; i++) {
-	  im->extract_min(x);
-    ae = tmpstr.write_item(x);
+    //copy im to a stream and free its memory
+    T x;
+    AMI_STREAM<T> tmpstr;
+    for (unsigned int i=0; i<pqcurrentsize; i++) {
+      im->extract_min(x);
+      ae = tmpstr.write_item(x);
+      assert(ae == AMI_ERROR_NO_ERROR);
+    }
+    delete im; im = NULL;
+    LOG_avail_memo();
+    
+    //allocate pq and buff_0 half size
+    bufsize = pqcapacity;
+    cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
+	 << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n"; 
+    cout.flush();
+    buff_0 = new im_buffer<T>(bufsize);
+    assert(buff_0);
+    cout << "EM_PQUEUE: allocating pq size=" << pqsize
+	 << " total " << (float)pqcapacity*sizeof(T)/(1<<20)  << "MB\n"; 
+    cout.flush();
+    pq = new MinMaxHeap<T>(pqsize);
+    assert(pq);
+    
+    //fill pq from tmp stream
+    ae =  tmpstr.seek(0);
     assert(ae == AMI_ERROR_NO_ERROR);
-	}
-	delete im;
-	LOG_avail_memo();
-	
-	//allocate pq and buff_0 half size
-	bufsize = pqcapacity;
-	cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
-		 << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n"; 
-	cout.flush();
-	buff_0 = new im_buffer<T>(bufsize);
-	assert(buff_0);
-	cout << "EM_PQUEUE: allocating pq size=" << pqcapacity
-		 << " total " << (float)pqcapacity*sizeof(T)/(1<<20)  << "MB\n"; 
-	cout.flush();
-	pq = new MinMaxHeap<T>(pqcapacity);
-	assert(pq);
-	
-	//fill pq from tmp stream
-	ae =  tmpstr.seek(0);
-	assert(ae == AMI_ERROR_NO_ERROR);
-	T *elt;
-	for (unsigned int i=0; i<pqcurrentsize; i++) {
-	  ae = tmpstr.read_item(&elt);
-	  assert(ae == AMI_ERROR_NO_ERROR);
-	  pq->insert(*elt);
-	}
-	assert(pq->size() == pqcurrentsize);
+    T *elt;
+    for (unsigned int i=0; i<pqcurrentsize; i++) {
+      ae = tmpstr.read_item(&elt);
+      assert(ae == AMI_ERROR_NO_ERROR);
+      pq->insert(*elt);
+    }
+    assert(pq->size() == pqcurrentsize);
   }
 
   //estimate buf_arity
@@ -430,6 +432,11 @@
      buf_arity = 1;
    }
 
+   //added on 05/16/2005 by Laura
+   if (buf_arity > MAX_STREAMS_OPEN) {
+	 buf_arity = MAX_STREAMS_OPEN;
+   }
+
   //allocate ext memory buffer array
   char str[200];
   sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
@@ -475,9 +482,13 @@
 template<class T, class Key>
 em_pqueue<T,Key>::~em_pqueue() {
   //delete in memory pqueue
-  if (pq) delete pq;
+  if (pq) {
+	delete pq; pq = NULL;
+  }
   //delete in memory buffer
-  if (buff_0) delete buff_0;
+  if (buff_0) {
+	delete buff_0; buff_0 = NULL;
+  }
   //delete ext memory buffers
   for (unsigned short i=0; i< crt_buf; i++) {
     if (buff[i]) delete buff[i];
@@ -582,10 +593,9 @@
   }
   //merge pqsize smallest elements from each buffer into a new stream
   ExtendedMergeStream** outstreams;
-  //gcc-3.4 doesn't allows (TYPE*)[SIZE] declarations
-  //use TYPE*[SIZE]
-  outstreams = new ExtendedMergeStream*[crt_buf];
+  outstreams = new ExtendedMergeStream* [crt_buf];
 
+  /* gets stream of smallest pqsize elts from each level */
   for (unsigned short i=0; i< crt_buf; i++) {
     MY_LOG_DEBUG_ID(crt_buf);
 	outstreams[i] = new ExtendedMergeStream();
@@ -596,6 +606,7 @@
     //print_stream(outstreams[i]); cout.flush();
   }
   
+  /* merge above streams into pqsize elts in minstream */
   if (crt_buf == 1) {
     //just one level; make common case faster :)
     merge_bufs2pq(outstreams[0]);
@@ -607,20 +618,21 @@
     //cout << "merging streams\n";
     ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
     assert(ae == AMI_ERROR_NO_ERROR);
-	for (unsigned short i=0; i< crt_buf; i++) {
+	for (int i=0; i< crt_buf; i++) {
 	  delete outstreams[i];
 	}
     delete [] outstreams;
-    
+
     //copy the minstream in the internal pqueue while merging with buff_0;
     //the smallest <pqsize> elements between minstream and buff_0 will be
     //inserted in internal pqueue;
     //also, the elements from minstram which are inserted in pqueue must be
     //marked as deleted in the source streams;
     merge_bufs2pq(minstream);
-	delete minstream;
+	delete minstream; minstream = NULL;
     //cout << "after merge_bufs2pq: \n" << *this << "\n";
   }
+
   XXX assert(pq->size());
   XXX cerr << "fillpq done" << endl;
   return true;
@@ -969,7 +981,7 @@
 #endif
   T val = x;
 
-  MY_LOG_DEBUG_ID("\n\n\nEM_PQUEUE::INSERT");
+  MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
   //if structure is empty insert x in pq; not worth the trouble..
   if ((crt_buf == 0) && (buff_0->is_empty())) {
     if (!pq->full()) {
@@ -1244,6 +1256,7 @@
     assert(instreams[i]);
     //rewind stream
     if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
+	  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
       return ami_err;
     }
     /* read first item */
@@ -1261,6 +1274,7 @@
       j++; 
 	  break;
 	default:
+	  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
 	  return ami_err;
 	}
   }
@@ -1283,6 +1297,7 @@
     //write min item to output stream
 	out = ExtendedEltMergeType<T,Key>(*in_objects[i], bufid, i);
     if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
+	  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
       return ami_err;
     }
     //cout << "wrote " << out << "\n";
@@ -1302,6 +1317,7 @@
 	  }
 	  break;
 	default:
+	  cerr << "WARNING!!! early breakout!!!" << endl;
 	  return ami_err;
     }
     //cout << "PQ: " << mergeheap << "\n";
@@ -1341,68 +1357,80 @@
   assert(arity> 1);
     
   //Pointers to current leading elements of streams
-  ExtendedEltMergeType<T,Key>* in_objects[arity];
+  ExtendedEltMergeType<T,Key> in_objects[arity];
+
   AMI_err ami_err;
-  unsigned int i;
+  //unsigned int i;
   unsigned int nonEmptyRuns=0;   //count number of non-empty runs
  
   //array initialized with first element from each stream (only non-null keys 
   //must be included)
   MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
+
   merge_key<Key>* keys = new merge_key<Key> [arity];
   assert(keys);
 
   //rewind and read the first item from every stream
-  for (i = 0; i < arity ; i++ ) {
+  for (int i = 0; i < arity ; i++ ) {
     //rewind stream
     if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
       return ami_err;
     }
     //read first item
-	ami_err = instreams[i]->read_item(&(in_objects[i]));
+	ExtendedEltMergeType<T,Key> *objp;
+	ami_err = instreams[i]->read_item(&objp);
 	switch(ami_err) {
 	case AMI_ERROR_NO_ERROR:
-      keys[nonEmptyRuns] = merge_key<Key>(in_objects[i]->getPriority(), i);
+	  in_objects[i] = *objp;
+      keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
 	  nonEmptyRuns++; 
 	  break;
 	case AMI_ERROR_END_OF_STREAM:
-	  in_objects[i] = NULL;
 	  break;
 	default:
 	  return ami_err;
 	}
   }
-
+ assert(nonEmptyRuns <= arity); 
+ 
   //build heap from the array of keys 
-  pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns);
+  pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns);	/* takes ownership of keys */
 
   //repeatedly extract_min from heap and insert next item from same stream
   long extracted = 0;
   //rewind output buffer
   ami_err = outstream->seek(0);
   assert(ami_err == AMI_ERROR_NO_ERROR);
+
   while (!mergeheap.empty() && (extracted < K)) {
     //find min key and id of stream it comes from
-    i = mergeheap.min().stream_id();
+    int id = mergeheap.min().stream_id();
     //write min item to output stream
-    if ((ami_err = outstream->write_item(*in_objects[i])) 
-		!= AMI_ERROR_NO_ERROR) {
+	assert(id < nonEmptyRuns);
+	assert(id >= 0);
+	assert(mergeheap.size() == nonEmptyRuns);
+	ExtendedEltMergeType<T,Key> obj = in_objects[id];
+    if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
       return ami_err;
     }
     //cout << "wrote " << *in_objects[i] << "\n";
 
     //extract the min from the heap and insert next key from same stream
-    ami_err = instreams[i]->read_item(&(in_objects[i]));
+	assert(id < nonEmptyRuns);
+	assert(id >= 0);
+	ExtendedEltMergeType<T,Key> *objp;
+    ami_err = instreams[id]->read_item(&objp);
 	switch(ami_err) {
 	case AMI_ERROR_NO_ERROR:
       {
-		merge_key<Key> tmp = merge_key<Key>(in_objects[i]->getPriority(), i);
+		in_objects[id] = *objp;
+		merge_key<Key> tmp = merge_key<Key>(in_objects[id].getPriority(), id);
 		mergeheap.delete_min_and_insert(tmp);
 	  }
 	  extracted++;    //update nb of extracted elements
 	  break;
 	case AMI_ERROR_END_OF_STREAM:
-	  in_objects[i] = NULL;
+	  mergeheap.delete_min();
 	  break;
 	default:
 	  return ami_err;
@@ -1415,7 +1443,6 @@
   //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
   //DESTRUCTOR, AND EVERYTHING SCREWS UP..
 
-
   MY_LOG_DEBUG_ID("merge_streams: done");
   return AMI_ERROR_NO_ERROR;
 }
@@ -1424,6 +1451,22 @@
 //************************************************************/
 template<class T, class Key>
 void
+em_pqueue<T,Key>::clear() {
+  pq->clear();
+  buff_0->clear();
+  
+  for(int i=0; i<crt_buf; i++) {
+    if(buff[i]) {
+      delete buff[i]; buff[i] = NULL;
+    }
+  }
+  crt_buf = 0;
+}
+
+
+//************************************************************/
+template<class T, class Key>
+void
 em_pqueue<T,Key>::print_range() {
   cout << "EM_PQ: [pq=" << pqsize
        << ", b=" << bufsize 

Modified: grass/trunk/include/iostream/imbuffer.h
===================================================================
--- grass/trunk/include/iostream/imbuffer.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/imbuffer.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,6 +16,7 @@
  *
  *****************************************************************************/
 
+
 #ifndef __IMBUFFER_H
 #define __IMBUFFER_H
 
@@ -146,6 +147,12 @@
 #endif
   }
 
+  //reset buffer (delete all data); don't delete memory
+  void clear() { 
+    size = 0; 
+    sorted = false;
+  }
+
   //reset buffer: keep n elements starting at position start
   void reset(unsigned long start, unsigned long n);
 

Modified: grass/trunk/include/iostream/mem_stream.h
===================================================================
--- grass/trunk/include/iostream/mem_stream.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mem_stream.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -84,7 +85,7 @@
 template<class T>
 AMI_err MEM_STREAM<T>::name(char **stream_name)  {
 
-  char const* path = "dummy";
+  const char *path = "dummy";
 
   *stream_name = new char [strlen(path) + 1];
   strcpy(*stream_name, path);

Modified: grass/trunk/include/iostream/minmaxheap.h
===================================================================
--- grass/trunk/include/iostream/minmaxheap.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/minmaxheap.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -65,16 +65,16 @@
 
 template <class T>
 class BasicMinMaxHeap {
-
 protected:  
   HeapIndex maxsize;
   HeapIndex lastindex;			// last used position (0 unused) (?)
   T *A;
+
+protected:
   /* couple of memory mgt functions to keep things consistent */
   static T *allocateHeap(HeapIndex n);
   static void freeHeap(T *);
-  virtual void grow()=0;
-  
+
 public:
   BasicMinMaxHeap(HeapIndex size) : maxsize(size) { 
     char str[100];
@@ -93,7 +93,10 @@
   };
 
   bool empty(void) const { return size() == 0; };
-  HeapIndex size() const;
+  HeapIndex size() const { 
+	assert(A ||  !lastindex);
+    return lastindex; 
+  };
 
   T get(HeapIndex i) const { assert(i <= size()); return A[i]; }
    
@@ -111,8 +114,10 @@
   bool extract_all_min(T& elt);
 
   void reset();
+  void clear();		/* mark all the data as deleted, but don't do free */
 
   void destructiveVerify();
+
   void verify();
   
   void print() const;
@@ -127,17 +132,18 @@
     return s;
   }
 
+protected:
+  virtual void grow()=0;
 
 private:
-  // Changed log2() to log2_() just in case log2() macro was already
-  // defined in math.h: e.g., log2() is defined in Cygwin gcc by default.
-  long log2_(long n) const;
-  int isOnMaxLevel(HeapIndex i) const { return (log2_(i) % 2); };
+  long log2(long n) const;
+  int isOnMaxLevel(HeapIndex i) const { return (log2(i) % 2); };
   int isOnMinLevel(HeapIndex i) const { return !isOnMaxLevel(i); };
 
   HeapIndex leftChild(HeapIndex i) const { return 2*i; };
   HeapIndex rightChild(HeapIndex i) const { return 2*i + 1; };
   int hasRightChild(HeapIndex i) const { return (rightChild(i) <= size()); };
+  int hasRightChild(HeapIndex i, HeapIndex *c) const { return ((*c=rightChild(i)) <= size()); };
   HeapIndex parent(HeapIndex i) const { return (i/2); };
   HeapIndex grandparent(HeapIndex i) const { return (i/4); };
   int hasChildren(HeapIndex i) const { return (2*i) <= size(); }; // 1 or more
@@ -164,20 +170,12 @@
 // index 0 is invalid
 // index <= size
 
-
 // ----------------------------------------------------------------------
-template <class T>
-HeapIndex BasicMinMaxHeap<T>::size() const { 
-	assert(A || !lastindex);
-  return lastindex; 
-}
 
-// ----------------------------------------------------------------------
-
 template <class T> 
-long BasicMinMaxHeap<T>::log2_(long n) const {
+long BasicMinMaxHeap<T>::log2(long n) const {
   long i=-1;
-  // let log2_(0)==-1
+  // let log2(0)==-1
   while(n) {
 	n = n >> 1;
 	i++;
@@ -262,8 +260,8 @@
   // p is smallest of left child, its grandchildren
   minpos = p;
 
-  if(hasRightChild(i)) {
-	p = rightChild(i);
+  if(hasRightChild(i,&p)) {
+	//p = rightChild(i);
 	if(hasChildren(p)) {
 	  q = smallestChild(p);
 	  if(A[p] > A[q]) p = q;
@@ -291,8 +289,8 @@
   // p is smallest of left child, its grandchildren
   maxpos = p;
 
-  if(hasRightChild(i)) {
-	p = rightChild(i);
+  if(hasRightChild(i,&p)) {
+	//p = rightChild(i);
 	if(hasChildren(p)) {
 	  q = largestChild(p);
 	  if(A[p] < A[q]) p = q;
@@ -624,7 +622,15 @@
 }
 
 // ----------------------------------------------------------------------
+
 template <class T> 
+void
+BasicMinMaxHeap<T>::clear() {
+  lastindex = 0;
+}
+
+// ----------------------------------------------------------------------
+template <class T> 
 T *
 BasicMinMaxHeap<T>::allocateHeap(HeapIndex n) {
   T *p;
@@ -708,17 +714,11 @@
 }
 
 
-
 // ----------------------------------------------------------------------
 // ----------------------------------------------------------------------
 
 template <class T>
 class MinMaxHeap : public BasicMinMaxHeap<T> {
-
-//using BasicMinMaxHeap<T>::maxsize;
-//using BasicMinMaxHeap<T>::lastindex;
-//using BasicMinMaxHeap<T>::size;
-
 public:
   MinMaxHeap(HeapIndex size) : BasicMinMaxHeap<T>(size) {};
   virtual ~MinMaxHeap() {};
@@ -727,7 +727,7 @@
   HeapIndex fill(T* arr, HeapIndex n);
   
 protected:
-  virtual void grow() { assert(0); exit(1); };
+  virtual void grow() { fprintf(stderr, "MinMaxHeap::grow: not implemented\n"); assert(0); exit(1); };
 };
 
 // ----------------------------------------------------------------------
@@ -738,12 +738,12 @@
 HeapIndex MinMaxHeap<T>::fill(T* arr, HeapIndex n) {
   HeapIndex i;
   //heap must be empty
-  assert(get_maxsize()==0);
+  assert(this->size()==0);
   for (i = 0; !full() && i<n; i++) {
     insert(arr[i]);
   }
   if (i < n) {
-    assert(i == get_maxsize());
+    assert(i == this->maxsize);
     return n - i;
   } else {
     return 0;
@@ -756,13 +756,9 @@
 
 template <class T>
 class UnboundedMinMaxHeap : public BasicMinMaxHeap<T> {
-
-using BasicMinMaxHeap<T>::A;
-using BasicMinMaxHeap<T>::maxsize;
-using BasicMinMaxHeap<T>::size;
-
 public:
   UnboundedMinMaxHeap() : BasicMinMaxHeap<T>(MMHEAP_INITIAL_SIZE) {};
+  UnboundedMinMaxHeap(HeapIndex size) : BasicMinMaxHeap<T>(size) {};
   virtual ~UnboundedMinMaxHeap() {};
 protected:
   virtual void grow();
@@ -770,14 +766,18 @@
 
 template <class T>
 void UnboundedMinMaxHeap<T>::grow() {
-  T *old = A;
-  maxsize *= 2;
+  T *old = this->A;
+  this->maxsize *= 2;
 
+  assert(this->maxsize > 0);
+
   if(old) {
-	A = allocateHeap(maxsize);	/* allocate a new array */
+	HeapIndex n = this->size();
+	this->A = allocateHeap(this->maxsize);	/* allocate a new array */
 	/* copy over the old values */
-	for(int i=0; i<size()+1; i++) {
-	  A[i] = old[i];
+	assert(this->maxsize > n);
+	for(HeapIndex i=0; i<=n; i++) {	/* why extra value? -RW */
+	  this->A[i] = old[i];
 	}	
 	freeHeap(old);				/* free up old storage */
   }

Modified: grass/trunk/include/iostream/mm.h
===================================================================
--- grass/trunk/include/iostream/mm.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mm.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,7 +16,6 @@
  *
  *****************************************************************************/
 
-
 #ifndef _MM_H
 #define _MM_H
 
@@ -83,8 +82,10 @@
   // flag indicates how we are keeping track of memory 
   static MM_mode register_new;
 
-protected: 
-  // private methods, only called by operators new and delete.
+//protected: 
+//  // private methods, only called by operators new and delete.
+
+public: //  Need to be accessible from pqueue constructor
   MM_err register_allocation  (size_t sz);
   MM_err register_deallocation(size_t sz);
 
@@ -110,6 +111,7 @@
 
   friend class mm_register_init;
   friend void * operator new(size_t);
+  friend void * operator new[](size_t);
   friend void operator delete(void *);
   friend void operator delete[](void *);
 };

Modified: grass/trunk/include/iostream/mm_utils.h
===================================================================
--- grass/trunk/include/iostream/mm_utils.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mm_utils.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,14 +16,13 @@
  *
  *****************************************************************************/
 
-
 #ifndef MM_UTIL_H
 #define MM_UTIL_H
 
-
 #include "mm.h"
 #include <string>
 
+
 void  LOG_avail_memo();
 
 size_t getAvailableMemory();

Modified: grass/trunk/include/iostream/pqheap.h
===================================================================
--- grass/trunk/include/iostream/pqheap.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/pqheap.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,14 +16,13 @@
  *
  *****************************************************************************/
 
-
 #ifndef _PQHEAP_H
 #define _PQHEAP_H
 
 #include <assert.h>
 #include <stdlib.h>
 
-#define PQHEAP_MEM_DEBUG 0
+#define PQHEAP_MEM_DEBUG if(0)
 
 
 //HEAPSTATUS can be defined at compile time
@@ -126,9 +125,7 @@
   inline unsigned int num_elts(void);
   
   // How many elements? sorry - i could never remember num_elts
-  inline unsigned int size(void) {
-    return cur_elts;
-  }
+  inline unsigned int size(void) const { return cur_elts; };
   
   // Min
   inline bool min(T& elt);
@@ -192,13 +189,12 @@
   elements = new T [size];
   cout << "pqheap_t1: register memory\n"; 
   cout.flush();
-#if PQHEAP_MEM_DEBUG
-  cout << "pqheap_t1::pq_heap_t1: allocate\n";
-  MMmanager.print();
-#endif
+  PQHEAP_MEM_DEBUG cout << "pqheap_t1::pq_heap_t1: allocate\n";
+  //  PQHEAP_MEM_DEBUG MMmanager.print();
+
   
   if (!elements) {
-    cout << "could not allocate priority queue: insufficient memory..\n";
+	cerr << "could not allocate priority queue: insufficient memory..\n";
     exit(1);
   }
   assert(elements);
@@ -223,6 +219,13 @@
 template <class T>
 inline 
 pqheap_t1<T>::pqheap_t1(T* a, unsigned int size) {
+  {
+	static int flag = 0;
+	if(!flag) {
+	  cerr << "Using slow build in pqheap_t1" << endl;
+	  flag = 1;
+	}
+  }
 
   elements = a;
   max_elts = size;
@@ -503,6 +506,7 @@
 template <class T>
 inline void
 pqheap_t1<T>::delete_min_and_insert(const T &x) {
+  assert(cur_elts);
   elements[0] = x;
   heapify(0);
 }

Modified: grass/trunk/include/iostream/queue.h
===================================================================
--- grass/trunk/include/iostream/queue.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/queue.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,8 +16,6 @@
  *
  *****************************************************************************/
 
-
-
 #ifndef QUEUE_H
 #define QUEUE_H
 
@@ -48,6 +46,9 @@
 
 template<class T> 
 queue<T>::queue(int vsize) : size(vsize) {
+
+  if(size <= 0) size = 64;		/* default */
+
   data = new T[size];
   head = 0;
   tail = 0;

Modified: grass/trunk/include/iostream/quicksort.h
===================================================================
--- grass/trunk/include/iostream/quicksort.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/quicksort.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -41,11 +41,13 @@
     
     // Try to get a good partition value and avoid being bitten by already
     // sorted input.
+    //ptpart = data + (random() % n);
 #ifdef __MINGW32__
     ptpart = data + (rand() % n);
 #else
     ptpart = data + (random() % n);
 #endif
+
     tpart = *ptpart;
     *ptpart = data[0];
     data[0] = tpart;

Modified: grass/trunk/include/iostream/replacementHeap.h
===================================================================
--- grass/trunk/include/iostream/replacementHeap.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/replacementHeap.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,7 +16,6 @@
  *
  *****************************************************************************/
 
-
 #ifndef REPLACEMENT_QUEUE_H
 #define REPLACEMENT_QUEUE_H 
 
@@ -95,7 +94,7 @@
   ReplacementHeap<T,Compare>(size_t arity, queue<char*>* runList);
   
   //delete array mergeHeap 
-  ~ReplacementHeap();
+  ~ReplacementHeap<T,Compare>();
   
   //is heap empty?
   int empty() const { 
@@ -131,7 +130,7 @@
 /*****************************************************************/
 template<class T,class Compare>
 ReplacementHeap<T,Compare>::ReplacementHeap(size_t g_arity, 
-						       queue<char*>* runList) {
+					    queue<char*>* runList) {
   char* name=NULL;
   
   assert(runList && g_arity > 0);
@@ -399,4 +398,3 @@
 
 
 #endif
-

Modified: grass/trunk/include/iostream/replacementHeapBlock.h
===================================================================
--- grass/trunk/include/iostream/replacementHeapBlock.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/replacementHeapBlock.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,7 +16,6 @@
  *
  *****************************************************************************/
 
-
 #ifndef REPLACEMENT_HEAPBLOCK_H
 #define REPLACEMENT_HEAPBLOCK_H
 
@@ -97,7 +96,7 @@
   ReplacementHeapBlock<T,Compare>(queue <MEM_STREAM<T>*> *runList); 
   
   //delete array mergeHeap 
-  ~ReplacementHeapBlock();
+  ~ReplacementHeapBlock<T,Compare>();
   
   //is heap empty?
   int empty() const { 
@@ -375,4 +374,3 @@
 
 
 #endif
-

Modified: grass/trunk/include/iostream/rtimer.h
===================================================================
--- grass/trunk/include/iostream/rtimer.h	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/rtimer.h	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -78,8 +78,8 @@
         perror("rusage/gettimeofday");			\
         exit(1);								\
   }
-	
 
+
 #define rt_u_useconds(rt)							\
 	(((double)rt.rut2.ru_utime.tv_usec +			\
 	  (double)rt.rut2.ru_utime.tv_sec*1000000)		\
@@ -98,16 +98,21 @@
 	  - ((double)rt.tv1.tv_usec +			\
 		 (double)rt.tv1.tv_sec*1000000))
 
+
 #endif /* __MINGW32__ */
 
+
+
+
 /* not required to be called, but makes values print as 0. 
    obviously a hack */
 #define rt_zero(rt) bzero(&(rt),sizeof(Rtimer));
-
+	
 #define rt_seconds(rt) (rt_w_useconds(rt)/1000000)
 
 #define rt_sprint(buf, rt) rt_sprint_safe(buf,rt)
 
 char * rt_sprint_safe(char *buf, Rtimer rt);
 
+
 #endif /* RTIMER_H */

Modified: grass/trunk/lib/iostream/ami_stream.cc
===================================================================
--- grass/trunk/lib/iostream/ami_stream.cc	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/ami_stream.cc	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,7 +16,6 @@
  *
  *****************************************************************************/
 
-
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <stdio.h>
@@ -26,8 +25,26 @@
 #include <errno.h>
 #include <unistd.h>
 
+//#include <ami_stream.h>
 #include <grass/iostream/ami_stream.h>
 
+
+char *ami_str_error[] = {
+  "AMI_ERROR_NO_ERROR",
+  "AMI_ERROR_IO_ERROR",
+  "AMI_ERROR_END_OF_STREAM",
+  "AMI_ERROR_OUT_OF_RANGE",
+  "AMI_ERROR_READ_ONLY",
+  "AMI_ERROR_OS_ERROR",
+  "AMI_ERROR_MM_ERROR",
+  "AMI_ERROR_OBJECT_INITIALIZATION",
+  "AMI_ERROR_PERMISSION_DENIED",
+  "AMI_ERROR_INSUFFICIENT_MAIN_MEMORY",
+  "AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS",
+  "AMI_ERROR_ENV_UNDEFINED",
+  "AMI_ERROR_NO_MAIN_MEMORY_OPERATION",
+};
+
 /**********************************************************************/
 /* creates a random file name, opens the file for reading and writing
    and and returns a file descriptor */
@@ -39,9 +56,13 @@
 
   // get the dir
   base_dir = getenv(STREAM_TMPDIR);
-  assert(base_dir);
+  if(!base_dir) {
+	fprintf(stderr, "ami_stream: %s not set\n", STREAM_TMPDIR);
+	assert(base_dir);
+	exit(1);
+  }
+  sprintf(tmp_path, "%s/%s_XXXXXX", base_dir, base.c_str());
 
-  sprintf(tmp_path, "%s/%s_XXXXXX", base_dir, base.c_str());
 #ifdef __MINGW32__
   fd = mktemp(tmp_path) ? open(tmp_path, O_CREAT|O_EXCL|O_RDWR, 0600) : -1;
 #else
@@ -73,17 +94,23 @@
   case   AMI_WRITE_STREAM:
     fp = fdopen(fd, "wb");
     break;
+  case AMI_APPEND_WRITE_STREAM:
+    fp = fdopen(fd, "ab");
+    break;
   case AMI_APPEND_STREAM:
     fp = fdopen(fd, "ab+");
     break;
   case AMI_READ_WRITE_STREAM: 
-      fp = fdopen(fd, "rb+");
-      if (!fp) {
-	//if file does not exist, create it
-	fp = fdopen(fd, "wb+");
-      }
-      break;
+	fp = fdopen(fd, "rb+");
+	if (!fp) {
+	  //if file does not exist, create it
+	  fp = fdopen(fd, "wb+");
+	}
+	break;
   }
+  if(!fp) {
+    perror("fdopen");
+  }
   assert(fp);
 
   return fp;
@@ -105,6 +132,9 @@
   case   AMI_WRITE_STREAM:
     fp = fopen(pathname, "wb");
     break;
+  case AMI_APPEND_WRITE_STREAM:
+    fp = fopen(pathname, "ab");
+    break;
   case AMI_APPEND_STREAM:
     fp = fopen(pathname, "ab+");
     assert(fp);
@@ -121,7 +151,7 @@
       break;
   }
   if (!fp) {
-    perror("cannot open stream");
+    perror(pathname);
     assert(0);
     exit(1);
   }

Modified: grass/trunk/lib/iostream/mm.cc
===================================================================
--- grass/trunk/lib/iostream/mm.cc	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/mm.cc	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -24,6 +24,8 @@
 #include <assert.h>
 #include <iostream>
 using namespace std;
+
+//#include <mm.h>
 #include <grass/iostream/mm.h>
 
 #define MM_DEBUG if(0)
@@ -254,6 +256,57 @@
 
  
 /* ************************************************************ */
+void* operator new[] (size_t sz) {
+  void *p;
+  
+  MM_DEBUG cout << "new: sz=" << sz << ", register " 
+		<< sz+SIZE_SPACE << "B ,"; 
+
+  if (MM_manager.register_allocation (sz + SIZE_SPACE) != MM_ERROR_NO_ERROR){
+    //must be MM_ERROR_INSUF_SPACE
+    switch(MM_manager.register_new) {
+      
+    case MM_ABORT_ON_MEMORY_EXCEEDED:
+      cerr << "MM error: limit ="<< MM_manager.memory_limit() <<"B. " 
+	   << "allocating " << sz << "B. " 
+	   << "limit exceeded by " 
+	   <<  MM_manager.memory_used() -  MM_manager.memory_limit()<<"B."
+	   << endl;
+      assert (0);		// core dump if debugging
+      exit (1);
+      break;
+      
+    case MM_WARN_ON_MEMORY_EXCEEDED:
+      cerr << "MM warning: limit="<<MM_manager.memory_limit() <<"B. " 
+	   << "allocating " << sz << "B. " 
+	   << " limit exceeded by " 
+	   <<  MM_manager.memory_used() -  MM_manager.memory_limit()<<"B."
+	   << endl;
+      break;
+      
+    case MM_IGNORE_MEMORY_EXCEEDED:
+      break;
+    }
+  }
+  
+  p = malloc(sz + SIZE_SPACE);
+  
+  if (!p) {
+    cerr << "new: out of memory while allocating " << sz << "B" << endl;
+    assert(0);
+    exit (1);
+  }
+  
+  *((size_t *) p) = sz;
+  
+  MM_DEBUG cout << "ptr=" << (void*) (((char *) p) + SIZE_SPACE) << endl;
+  
+  return ((char *) p) + SIZE_SPACE;
+}
+
+
+ 
+/* ************************************************************ */
 void* operator new (size_t sz) {
   void *p;
   
@@ -319,7 +372,8 @@
     //destructor for something that was not allocated with new
     //e.g. ofstream str(name) ---- ~ofstream() called ==> ptr=NULL
     
-    //assert(0); 
+	//who wrote the above comment? -RW
+    assert(0); 
     //exit(1);
     return;
   }

Modified: grass/trunk/lib/iostream/mm_utils.cc
===================================================================
--- grass/trunk/lib/iostream/mm_utils.cc	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/mm_utils.cc	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -17,18 +17,14 @@
  *****************************************************************************/
 
 #include <sys/types.h>
+#include <sys/mman.h>
 #include <ctype.h>
-
-#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
 #include <ostream>
-#else
-#include <ostream.h>
-#endif
-
 #include <iostream>
 using namespace std;
 #include <stdio.h>
 
+//#include <mm.h>
 #include <grass/iostream/mm.h>
 
 
@@ -46,7 +42,8 @@
   return fmem;
 }
 
-void MEMORY_LOG(std::string str) {
+void 
+MEMORY_LOG(std::string str) {
   printf("%s", str.c_str());
   fflush(stdout);
 }

Modified: grass/trunk/lib/iostream/rtimer.cc
===================================================================
--- grass/trunk/lib/iostream/rtimer.cc	2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/rtimer.cc	2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
 /****************************************************************************
  * 
- *  MODULE:	r.terraflow
+ *  MODULE:	iostream
  *
  *  COPYRIGHT (C) 2007 Laura Toma
  *   
@@ -16,12 +16,13 @@
  *
  *****************************************************************************/
 
-
 #include <sys/time.h>
+#include <sys/resource.h>
 #include <stdio.h>
 #include <string.h>
 #include <strings.h>
 
+//#include <rtimer.h>
 #include <grass/iostream/rtimer.h>
 
 char *



More information about the grass-commit mailing list