[GRASS-SVN] r32625 - in grass/trunk: include/iostream lib/iostream
svn_grass at osgeo.org
svn_grass at osgeo.org
Thu Aug 7 17:13:14 EDT 2008
Author: pkelly
Date: 2008-08-07 17:13:13 -0400 (Thu, 07 Aug 2008)
New Revision: 32625
Modified:
grass/trunk/include/iostream/ami.h
grass/trunk/include/iostream/ami_config.h
grass/trunk/include/iostream/ami_sort.h
grass/trunk/include/iostream/ami_sort_impl.h
grass/trunk/include/iostream/ami_stream.h
grass/trunk/include/iostream/embuffer.h
grass/trunk/include/iostream/empq.h
grass/trunk/include/iostream/empq_adaptive.h
grass/trunk/include/iostream/empq_adaptive_impl.h
grass/trunk/include/iostream/empq_impl.h
grass/trunk/include/iostream/imbuffer.h
grass/trunk/include/iostream/mem_stream.h
grass/trunk/include/iostream/minmaxheap.h
grass/trunk/include/iostream/mm.h
grass/trunk/include/iostream/mm_utils.h
grass/trunk/include/iostream/pqheap.h
grass/trunk/include/iostream/queue.h
grass/trunk/include/iostream/quicksort.h
grass/trunk/include/iostream/replacementHeap.h
grass/trunk/include/iostream/replacementHeapBlock.h
grass/trunk/include/iostream/rtimer.h
grass/trunk/lib/iostream/ami_stream.cc
grass/trunk/lib/iostream/mm.cc
grass/trunk/lib/iostream/mm_utils.cc
grass/trunk/lib/iostream/rtimer.cc
Log:
Updates to iostream library from Laura Toma.
Modified: grass/trunk/include/iostream/ami.h
===================================================================
--- grass/trunk/include/iostream/ami.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,7 +17,6 @@
*
*****************************************************************************/
-
#ifndef _AMI_H
#define _AMI_H
Modified: grass/trunk/include/iostream/ami_config.h
===================================================================
--- grass/trunk/include/iostream/ami_config.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_config.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
Modified: grass/trunk/include/iostream/ami_sort.h
===================================================================
--- grass/trunk/include/iostream/ami_sort.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_sort.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -76,8 +76,9 @@
template<class T, class Compare>
AMI_err
AMI_sort(AMI_STREAM<T> *instream, AMI_STREAM<T> **outstream, Compare *cmp,
- int deleteInputStream = 0) {
- char* name;
+ int deleteInputStream = 0)
+{
+ char* name=NULL;
queue<char*>* runList;
int instreamLength;
@@ -102,18 +103,30 @@
//run formation
runList = runFormation(instream, cmp);
- assert(runList && runList->length() > 0);
+ assert(runList);
if (deleteInputStream) {
delete instream;
}
- if (runList->length() == 1) {
+ if(runList->length() == 0) {
+ /* self-check */
+ fprintf(stderr, "ami_sort: Error - no runs created!\n");
+ instream->name(&name);
+ cout << "ami_sort: instream = " << name << endl;
+ exit(1);
+ /* no input... */
+ /* *outstream = new AMI_STREAM<T>(); */
+
+ } else if(runList->length() == 1) {
//if 1 run only
runList->dequeue(&name);
+ //printf("SORT: %s\n", name); fflush(stdout);
*outstream = new AMI_STREAM<T>(name);
delete name; //should be safe, stream makes its own copy
- } else {
+
+ } else {
+ /* many runs */
*outstream = multiMerge<T,Compare>(runList, cmp);
//i thought the templates are not needed in the call, but seems to
//help the compiler..laura
Modified: grass/trunk/include/iostream/ami_sort_impl.h
===================================================================
--- grass/trunk/include/iostream/ami_sort_impl.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_sort_impl.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -36,7 +37,6 @@
#define BLOCKED_RUN
-
/* ---------------------------------------------------------------------- */
//set run_size, last_run_size and nb_runs depending on how much memory
//is available
@@ -45,16 +45,18 @@
initializeRunFormation(AMI_STREAM<T> *instream,
size_t &run_size, size_t &last_run_size,
unsigned int &nb_runs) {
-
- off_t strlen = instream->stream_len();
+
size_t mm_avail = MM_manager.memory_available();
+ off_t strlen;
+
#ifdef BLOCKED_RUN
// not in place, can only use half memory
mm_avail = mm_avail/2;
#endif
-
run_size = mm_avail/sizeof(T);
+
+ strlen = instream->stream_len();
if (strlen == 0) {
nb_runs = last_run_size = 0;
} else {
@@ -66,6 +68,7 @@
last_run_size = strlen % run_size;
}
}
+
SDEBUG cout << "nb_runs=" << nb_runs
<< ", run_size=" << run_size
<< ", last_run_size=" << last_run_size
@@ -78,17 +81,19 @@
/* data is allocated; read run_size elements from stream into data and
sort them using quicksort */
template<class T, class Compare>
-void makeRun_Block(AMI_STREAM<T> *instream, T* data,
+size_t makeRun_Block(AMI_STREAM<T> *instream, T* data,
unsigned int run_size, Compare *cmp) {
AMI_err err;
-
+ off_t new_run_size;
+
//read next run from input stream
- err = instream->read_array(data, run_size);
- assert(err == AMI_ERROR_NO_ERROR);
-
+ err = instream->read_array(data, run_size, &new_run_size);
+ assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
+
//sort it in memory in place
- quicksort(data, run_size, *cmp);
+ quicksort(data, new_run_size, *cmp);
+ return new_run_size;
}
@@ -105,6 +110,7 @@
unsigned int nblocks, last_block_size, crt_block_size, block_size;
+
block_size = STREAM_BUFFER_SIZE;
if (run_size % block_size == 0) {
@@ -178,16 +184,18 @@
SDEBUG cout << "runFormation: ";
SDEBUG MM_manager.print();
+ /* leave this in for now, in case some file-based implementations do
+ anything funny... -RW */
//rewind file
instream->seek(0); //should check error xxx
-
+
//estimate run_size, last_run_size and nb_runs
initializeRunFormation(instream, run_size, last_run_size, nb_runs);
-
- //create runList
+
+ //create runList (if 0 size, queue uses default)
runList = new queue<char*>(nb_runs);
-
- //allocate space for a run
+
+ /* allocate space for a run */
if (nb_runs <= 1) {
//don't waste space if input stream is smaller than run_size
data = new T[last_run_size];
@@ -195,18 +203,20 @@
data = new T[run_size];
}
SDEBUG MM_manager.print();
-
- for (size_t i=0; i< nb_runs; i++) {
+
+ //for (size_t i=0; i< nb_runs; i++) {
+ while(!instream->eof()) {
+ //crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
- crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
-
//SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
-#ifdef BLOCKED_RUN
- makeRun(instream, data, crt_run_size, cmp);
-#else
- makeRun_Block(instream, data, crt_run_size, cmp);
-#endif
+ crt_run_size = makeRun_Block(instream, data, run_size, cmp);
+/* #ifdef BLOCKED_RUN */
+/* makeRun(instream, data, crt_run_size, cmp); */
+/* #else */
+/* makeRun_Block(instream, data, crt_run_size, cmp); */
+/* #endif */
+
SDEBUG MM_manager.print();
//read next run from input stream
@@ -214,19 +224,21 @@
//assert(err == AMI_ERROR_NO_ERROR);
//sort it in memory in place
//quicksort(data, crt_run_size, *cmp);
+
+ if(crt_run_size > 0) {
+ //create a new stream to hold this run
+ str = new AMI_STREAM<T>();
+ str->write_array(data, crt_run_size);
+ assert(str->stream_len() == crt_run_size);
- //create a new stream to hold this run
- str = new AMI_STREAM<T>();
- str->write_array(data, crt_run_size);
- assert(str->stream_len() == crt_run_size);
-
- //remember this run's name
- str->name(&strname);
- runList->enqueue(strname);
-
- //delete the stream -- should not keep too many streams open
- str->persist(PERSIST_PERSISTENT);
- delete str;
+ //remember this run's name
+ str->name(&strname); /* deleted after we dequeue */
+ runList->enqueue(strname);
+ //delete the stream -- should not keep too many streams open
+ str->persist(PERSIST_PERSISTENT);
+ delete str;
+ }
+
};
SDEBUG MM_manager.print();
//release the run memory!
@@ -263,7 +275,8 @@
template<class T, class Compare>
AMI_STREAM<T>*
-singleMerge(queue<char*>* streamList, Compare *cmp) {
+singleMerge(queue<char*>* streamList, Compare *cmp)
+{
AMI_STREAM<T>* mergedStr;
size_t mm_avail, blocksize;
unsigned int arity, max_arity;
@@ -275,18 +288,26 @@
//estimate max possible merge arity with available memory (approx M/B)
mm_avail = MM_manager.memory_available();
- blocksize = getpagesize();
+ //blocksize = getpagesize();
//should use AMI function, but there's no stream at this point
- // AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_BUFFER);
- max_arity = mm_avail/blocksize;
+ //now use static mtd -RW 5/05
+ AMI_STREAM<T>::main_memory_usage(&blocksize, MM_STREAM_USAGE_MAXIMUM);
+ max_arity = mm_avail / blocksize;
+ if(max_arity < 2) {
+ cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
+ max_arity = 2;
+ } else if(max_arity > MAX_STREAMS_OPEN) {
+ max_arity = MAX_STREAMS_OPEN;
+ }
arity = (streamList->length() < max_arity) ?
streamList->length() : max_arity;
SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n";
-
+
+ /* create the output stream. if this is a complete merge, use finalpath */
//create output stream
mergedStr = new AMI_STREAM<T>();
-
+
ReplacementHeap<T,Compare> rheap(arity, streamList);
SDEBUG rheap.print(cerr);
@@ -325,7 +346,8 @@
template<class T, class Compare>
AMI_STREAM<T>*
-multiMerge(queue<char*>* runList, Compare *cmp) {
+multiMerge(queue<char*>* runList, Compare *cmp)
+{
AMI_STREAM<T> * mergedStr= NULL;
char* path;
Modified: grass/trunk/include/iostream/ami_stream.h
===================================================================
--- grass/trunk/include/iostream/ami_stream.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/ami_stream.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -31,15 +31,12 @@
#include <unistd.h>
#include <iostream>
-#include <cstring>
using namespace std;
+#define MAX_STREAMS_OPEN 200
+
#include "mm.h" // Get the memory manager.
-#ifdef __MINGW32__
-#define getpagesize() (4096)
-#endif
-
#define DEBUG_DELETE if(0)
// The name of the environment variable which keeps the name of the
@@ -49,7 +46,7 @@
// All streams will be names STREAM_*****
#define BASE_NAME "STREAM"
-#define STREAM_BUFFER_SIZE (1<<15)
+#define STREAM_BUFFER_SIZE (1<<18)
//
@@ -71,6 +68,7 @@
AMI_ERROR_NO_MAIN_MEMORY_OPERATION,
};
+extern char *ami_str_error[];
//
// AMI stream types passed to constructors
@@ -79,7 +77,8 @@
AMI_READ_STREAM = 1, // Open existing stream for reading
AMI_WRITE_STREAM, // Open for writing. Create if non-existent
AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
- AMI_READ_WRITE_STREAM // Open to read and write.
+ AMI_READ_WRITE_STREAM, // Open to read and write.
+ AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
};
@@ -94,13 +93,11 @@
PERSIST_READ_ONCE
};
-
-
-template<class T>
-class AMI_STREAM {
-private:
+/* an un-templated version makes for easier debugging */
+class UntypedStream {
+protected:
FILE * fp;
- //int fd; //descriptor of file
+ int fildes; //descriptor of file
AMI_stream_type access_mode;
char path[BUFSIZ];
persistence per;
@@ -117,35 +114,45 @@
//stream buffer passed in the call to setvbuf when file is opened
char* buf;
+ int eof_reached;
+ public:
+ static unsigned int get_block_length() {
+ return STREAM_BUFFER_SIZE;
+ //return getpagesize();
+ };
+
+};
+
+template<class T>
+class AMI_STREAM : public UntypedStream {
+
protected:
- unsigned int get_block_length();
-
+
+ T read_tmp; /* this is ugly... RW */
+
public:
- T read_tmp;
-
// An AMI_stream with default name
AMI_STREAM();
// An AMI stream based on a specific path name.
- AMI_STREAM(const char *path_name,
- // AMI_stream_type st = AMI_READ_WRITE_STREAM);
- AMI_stream_type st);
+ AMI_STREAM(const char *path_name, AMI_stream_type st);
+
+ // convenience function with split path_name
+ //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
+
// A psuedo-constructor for substreams.
AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
AMI_STREAM<T> **sub_stream);
-
+
// Destructor
~AMI_STREAM(void);
// Read and write elements.
AMI_err read_item(T **elt);
-
AMI_err write_item(const T &elt);
-
- AMI_err read_array(T *data, off_t len);
-
+ AMI_err read_array(T *data, off_t len, off_t *lenp);
AMI_err write_array(const T *data, off_t len);
// Return the number of items in the stream.
@@ -153,32 +160,34 @@
// Return the path name of this stream.
AMI_err name(char **stream_name);
+ const char* name() const;
// Move to a specific item in the stream.
AMI_err seek(off_t offset);
-
+
// Query memory usage
- AMI_err main_memory_usage(size_t *usage,
+ static AMI_err main_memory_usage(size_t *usage,
//MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD);
MM_stream_usage usage_type);
void persist(persistence p);
char *sprint();
+
+ // have we hit the end of the stream
+ int eof();
};
/**********************************************************************/
-template<class T>
-unsigned int AMI_STREAM<T>::get_block_length() {
- return getpagesize();
-}
/**********************************************************************/
/* creates a random file name, opens the file for reading and writing
and and returns a file descriptor */
-int ami_single_temp_name(const std::string& base, char* tmp_path);
+/* int ami_single_temp_name(char *base, char* tmp_path); */
+/* fix from Andy Danner */
+int ami_single_temp_name(const std::string& base, char* tmp_path);
/**********************************************************************/
@@ -201,8 +210,8 @@
access_mode = AMI_READ_WRITE_STREAM;
int fd = ami_single_temp_name(BASE_NAME, path);
+ fildes = fd;
fp = open_stream(fd, access_mode);
-
/* a stream is by default buffered with a buffer of size BUFSIZ=1K */
buf = new char[STREAM_BUFFER_SIZE];
@@ -218,8 +227,11 @@
substream_level = 0;
logical_bos = logical_eos = -1;
+ // why is this here in the first place?? -RW
seek(0);
+ eof_reached = 0;
+
// Register memory usage before returning.
//size_t usage;
//main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
@@ -233,12 +245,19 @@
template<class T>
AMI_STREAM<T>::AMI_STREAM(const char *path_name,
AMI_stream_type st = AMI_READ_WRITE_STREAM) {
-
+
access_mode = st;
- strcpy(path, path_name);
- fp = open_stream(path, st);
-
+ if(path_name == NULL) {
+ int fd = ami_single_temp_name(BASE_NAME, path);
+ fildes = fd;
+ fp = open_stream(fd, access_mode);
+ } else {
+ strcpy(path, path_name);
+ fp = open_stream(path, st);
+ fildes = -1;
+ }
+
/* a stream is by default buffered with a buffer of size BUFSIZ=1K */
buf = new char[STREAM_BUFFER_SIZE];
if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
@@ -246,9 +265,15 @@
exit(1);
}
+ eof_reached = 0;
+
// By default, all streams are deleted at destruction time.
- per = PERSIST_DELETE;
-
+ if(st == AMI_READ_STREAM) {
+ per = PERSIST_PERSISTENT;
+ } else {
+ per = PERSIST_DELETE;
+ }
+
// Not a substream.
substream_level = 0;
logical_bos = logical_eos = -1;
@@ -303,12 +328,13 @@
// Set the current position.
substr->seek(0);
-
+
+ substr->eof_reached = 0;
+
//set substream level
substr->substream_level = substream_level + 1;
- //set persistence
- substr->per = per;
+ substr->per = per; //set persistence
//*sub_stream = (AMI_base_stream < T > *)substr;
*sub_stream = substr;
@@ -327,6 +353,7 @@
struct stat buf;
if (stat(path, &buf) == -1) {
perror("AMI_STREAM::stream_len(): fstat failed ");
+ perror(path);
assert(0);
exit(1);
}
@@ -349,8 +376,15 @@
return AMI_ERROR_NO_ERROR;
};
+// Return the path name of this stream.
+template<class T>
+const char *
+AMI_STREAM<T>::name() const {
+ return path;
+};
+
/**********************************************************************/
// Move to a specific offset within the (sub)stream.
template<class T>
@@ -358,8 +392,7 @@
off_t seek_offset;
- if (substream_level) {
- //substream
+ if (substream_level) { //substream
if (offset > (unsigned) (logical_eos - logical_bos)) {
//offset out of range
cerr << "AMI_STREAM::seek bos=" << logical_bos << ", eos=" << logical_eos
@@ -377,8 +410,8 @@
seek_offset = offset * sizeof(T);
}
- if (fseek(fp, seek_offset, SEEK_SET) == -1) {
- cerr << "AMI_STREAM::seek offset=" << seek_offset << "failed.\n";
+ if (fseeko(fp, seek_offset, SEEK_SET) == -1) {
+ cerr << "AMI_STREAM::seek offset=" << seek_offset << " failed.\n";
assert(0);
exit(1);
}
@@ -392,12 +425,13 @@
/**********************************************************************/
// Query memory usage
template<class T>
-AMI_err AMI_STREAM<T>::main_memory_usage(size_t *usage,
- MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
+AMI_err
+AMI_STREAM<T>::main_memory_usage(size_t *usage,
+ MM_stream_usage usage_type= MM_STREAM_USAGE_OVERHEAD) {
switch (usage_type) {
case MM_STREAM_USAGE_OVERHEAD:
- *usage = sizeof (*this);
+ *usage = sizeof (AMI_STREAM<T>);
break;
case MM_STREAM_USAGE_BUFFER:
// *usage = get_block_length();
@@ -406,7 +440,7 @@
case MM_STREAM_USAGE_CURRENT:
case MM_STREAM_USAGE_MAXIMUM:
// *usage = sizeof (*this) + get_block_length();
- *usage = sizeof (*this) + STREAM_BUFFER_SIZE*sizeof(char);
+ *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
break;
}
return AMI_ERROR_NO_ERROR;
@@ -417,15 +451,15 @@
/**********************************************************************/
template<class T>
AMI_STREAM<T>::~AMI_STREAM(void) {
-
+
DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
- delete buf;
assert(fp);
fclose(fp);
+ delete buf;
// Get rid of the file if not persistent and if not substream.
if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
- if (remove(path) == -1) {
+ if (unlink(path) == -1) {
cerr << "AMI_STREAM: failed to unlink " << path << endl;
perror("cannot unlink ");
assert(0);
@@ -445,16 +479,21 @@
AMI_err AMI_STREAM<T>::read_item(T **elt) {
assert(fp);
+
//if we go past substream range
if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
return AMI_ERROR_END_OF_STREAM;
} else {
if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
- //cerr << "file=" << path << ":";
- //perror("cannot read!");
- //assume EOF --should fix this XXX
- return AMI_ERROR_END_OF_STREAM;
+ if(feof(fp)) {
+ eof_reached = 1;
+ return AMI_ERROR_END_OF_STREAM;
+ } else {
+ cerr << "file=" << path << ":";
+ perror("cannot read!");
+ return AMI_ERROR_IO_ERROR;
+ }
}
*elt = &read_tmp;
@@ -467,21 +506,30 @@
/**********************************************************************/
template<class T>
-AMI_err AMI_STREAM<T>::read_array(T *data, off_t len) {
-
+AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp=NULL) {
+ size_t nobj;
assert(fp);
//if we go past substream range
if ((logical_eos >= 0) && ftell(fp) >= sizeof(T) * logical_eos) {
+ eof_reached = 1;
return AMI_ERROR_END_OF_STREAM;
} else {
- if (fread((void*)data, sizeof(T), len, fp) < len) {
- cerr << "file=" << path << ":";
- perror("cannot read!");
- //assume EOF --should fix this XXX
- return AMI_ERROR_END_OF_STREAM;
- }
+ nobj = fread((void*)data, sizeof(T), len, fp);
+
+ if (nobj < len) { /* some kind of error */
+ if(feof(fp)) {
+ if(lenp) *lenp = nobj;
+ eof_reached = 1;
+ return AMI_ERROR_END_OF_STREAM;
+ } else {
+ cerr << "file=" << path << ":";
+ perror("cannot read!");
+ return AMI_ERROR_IO_ERROR;
+ }
+ }
+ if(lenp) *lenp = nobj;
return AMI_ERROR_NO_ERROR;
}
};
@@ -501,9 +549,11 @@
} else {
if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
cerr << "AMI_STREAM::write_item failed.\n";
+ perror(path);
assert(0);
exit(1);
}
+
return AMI_ERROR_NO_ERROR;
}
};
@@ -512,6 +562,7 @@
/**********************************************************************/
template<class T>
AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
+ size_t nobj;
assert(fp);
//if we go past substream range
@@ -519,12 +570,13 @@
return AMI_ERROR_END_OF_STREAM;
} else {
- if (fwrite(data, sizeof(T), len,fp) < len) {
+ nobj = fwrite(data, sizeof(T), len, fp);
+ if (nobj < len) {
cerr << "AMI_STREAM::write_item failed.\n";
assert(0);
exit(1);
}
- return AMI_ERROR_NO_ERROR;
+ return AMI_ERROR_NO_ERROR;
}
};
@@ -532,7 +584,7 @@
/**********************************************************************/
template<class T>
void AMI_STREAM<T>::persist(persistence p) {
- per = p;
+ per = p;
};
@@ -551,4 +603,11 @@
return buf;
};
+/**********************************************************************/
+template<class T>
+int AMI_STREAM<T>::eof() {
+ return eof_reached;
+};
+
+
#endif // _AMI_STREAM_H
Modified: grass/trunk/include/iostream/embuffer.h
===================================================================
--- grass/trunk/include/iostream/embuffer.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/embuffer.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -21,9 +22,9 @@
#include <stdio.h>
-#include <math.h>
#include <assert.h>
#include <stdlib.h>
+#include <math.h>
#include "ami_config.h" //for SAVE_MEMORY
#include "ami_stream.h"
@@ -293,7 +294,7 @@
unsigned long get_stream_maxlen() const {
return (unsigned long)pow((double)arity,(double)level-1)*basesize;
}
-
+
//return the actual size of stream i; i must be the index of a valid
//stream
unsigned long get_stream_len(unsigned int i) {
@@ -439,9 +440,7 @@
arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
MEMORY_LOG(str);
//allocate STREAM* array
- //GCC-3.4 does not allow (TYPE)[array]
- //use TYPE[array]
- data = new AMI_STREAM<T>*[arity];
+ data = new AMI_STREAM<T>* [arity];
//allocate deleted array
sprintf(str, "em_buffer: allocate deleted array: %ld\n",
@@ -460,10 +459,7 @@
sprintf(str, "em_buffer: allocate name array: %ld\n",
(long)(arity*sizeof(char*)));
MEMORY_LOG(str);
- //GCC-3.4 does not allow (TYPE)[array]
- //use TYPE[array]
- //name = new (char*)[arity];
- name = new char*[arity];
+ name = new char* [arity];
assert(name);
#endif
@@ -805,6 +801,7 @@
#ifdef SAVE_MEMORY
//stream is empty ==> delete its name
+ assert(name[i]);
delete name[i];
name[i] = NULL;
#endif
@@ -831,27 +828,28 @@
for (i=0; i<index; i++) {
//if i'th stream is not empty, shift it left if necessary
if (data[i]) {
- if (i!=j) {
- //set j'th stream to point to i'th stream
- //cout << j << " set to " << i << endl; cout.flush();
- data[j] = data[i];
- deleted[j] = deleted[i];
- streamsize[j] = streamsize[i];
- //set i'th stream to point to NULL
- data[i] = NULL;
- deleted[i] = 0;
- streamsize[i] = 0;
+ if (i!=j) {
+ //set j'th stream to point to i'th stream
+ //cout << j << " set to " << i << endl; cout.flush();
+ data[j] = data[i];
+ deleted[j] = deleted[i];
+ streamsize[j] = streamsize[i];
+ //set i'th stream to point to NULL
+ data[i] = NULL;
+ deleted[i] = 0;
+ streamsize[i] = 0;
#ifdef SAVE_MEMORY
- //fix the names
- delete name[j];
- name[j] = name[i];
- name[i] = NULL;
- check_name(j);
+ //fix the names
+/* already done assert(name[j]); */
+/* delete name[j]; */
+ name[j] = name[i];
+ name[i] = NULL;
+ check_name(j);
#endif
- } else {
- //cout << i << " left the same" << endl;
- }
- j++;
+ } else {
+ //cout << i << " left the same" << endl;
+ }
+ j++;
} //if data[i] != NULL
}//for i
@@ -896,6 +894,7 @@
assert(streamsize[i] == data[i]->stream_len());
#ifdef SAVE_MEMORY
check_name(i);
+ assert(name[i]);
delete name[i];
name[i] = NULL;
#endif
@@ -918,12 +917,13 @@
//all streams of the buffer in sorted ascending order of
//their keys (priorities);
template<class T, class Key>
-AMI_STREAM<T>* em_buffer<T,Key>::sort() {
+AMI_STREAM<T>*
+em_buffer<T,Key>::sort() {
//create stream
MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
- AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>();
+ AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>(); /* will be deleteed in insert() */
assert(sorted_stream);
//merge the streams into sorted stream
Modified: grass/trunk/include/iostream/empq.h
===================================================================
--- grass/trunk/include/iostream/empq.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,6 +16,7 @@
*
*****************************************************************************/
+
#ifndef __EMPQ_H
#define __EMPQ_H
@@ -202,6 +203,9 @@
//return maximum capacity of em_pqueue
long maxlen();
+ // delete all the data in the pq; reset to empty but don't free memory
+ void clear();
+
//print structure
void print_range();
Modified: grass/trunk/include/iostream/empq_adaptive.h
===================================================================
--- grass/trunk/include/iostream/empq_adaptive.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_adaptive.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -27,7 +28,7 @@
-#define EMPQAD_DEBUG if(0)
+#define EMPQAD_DEBUG if(1)
enum regim_type {
@@ -45,11 +46,13 @@
MinMaxHeap<T> *im;
em_pqueue<T,Key> *em;
UnboundedMinMaxHeap<T> *dim; // debug, internal memory pq
+ void initPQ(size_t);
public:
/* start in INMEM regim by allocating im of size precisely twice the
size of the (pqueue within) the em_pqueue; */
EMPQueueAdaptive(long N) : EMPQueueAdaptive() {};
EMPQueueAdaptive();
+ EMPQueueAdaptive(size_t inMem);
~EMPQueueAdaptive();
void makeExternal();
@@ -74,6 +77,8 @@
long size() const; //return the nb of elements in the structure
+ void clear(); /* delete all contents of pq */
+
void verify();
};
Modified: grass/trunk/include/iostream/empq_adaptive_impl.h
===================================================================
--- grass/trunk/include/iostream/empq_adaptive_impl.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_adaptive_impl.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,6 +16,7 @@
*
*****************************************************************************/
+
#ifndef __EMPQ_ADAPTIVE_IMPL_H
#define __EMPQ_ADAPTIVE_IMPL_H
@@ -28,10 +29,10 @@
#include "mm_utils.h"
#include "empq_adaptive.h"
+#include "ami_sort.h"
-//defined in "empqAdaptive.H"
-//#define EMPQAD_DEBUG if(0)
+// EMPQAD_DEBUG defined in "empqAdaptive.H"
@@ -39,8 +40,30 @@
//------------------------------------------------------------
//allocate an internal pqueue of size precisely twice
//the size of the pqueue within the em_pqueue;
+//
+//This constructor uses a user defined amount of memory
template<class T, class Key>
+EMPQueueAdaptive<T,Key>::EMPQueueAdaptive(size_t inMem) {
+ regim = INMEM;
+ EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue"
+ << endl;
+
+ //------------------------------------------------------------
+ //set the size precisely as in empq constructor since we cannot
+ //really call the em__pqueue constructor, because we don't want
+ //the space allocated; we just want the sizes;
+ //AMI_err ae;
+ EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
+ << ( (float)inMem/ (1<< 20)) << "MB" << endl;
+
+ initPQ(inMem);
+};
+
+
+//------------------------------------------------------------
+// This more resembles the original constuctor which is greedy
+template<class T, class Key>
EMPQueueAdaptive<T,Key>::EMPQueueAdaptive() {
regim = INMEM;
EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue"
@@ -50,11 +73,26 @@
//set the size precisely as in empq constructor since we cannot
//really call the em__pqueue constructor, because we don't want
//the space allocated; we just want the sizes;
- AMI_err ae;
size_t mm_avail = getAvailableMemory();
EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: "
<< ( (float)mm_avail/ (1<< 20)) << "MB" << endl;
+
+ initPQ(mm_avail);
+
+};
+
+
+//------------------------------------------------------------
+// This metod initialized the PQ based on the memory passed
+// into it
+template<class T, class Key>
+void
+EMPQueueAdaptive<T,Key>::initPQ(size_t initMem) {
+ AMI_err ae;
+ EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
+ << ( (float)initMem/ (1<< 20)) << "MB" << endl;
+
/* same calculations as empq constructor in order to estimate
overhead memory; this is because we want to allocate a pqueue of
size exactly double the size of the pqueue inside the empq;
@@ -70,26 +108,35 @@
cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
exit(1);
}
+
+
//account for temporary memory usage
unsigned short max_nbuf = 2;
- unsigned int buf_arity = mm_avail/(2 * sz_stream);
+ unsigned int buf_arity = initMem/(2 * sz_stream);
+ if (buf_arity > MAX_STREAMS_OPEN) buf_arity = MAX_STREAMS_OPEN;
unsigned long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
max_nbuf * sizeof(em_buffer<T,Key>) +
2*sz_stream + max_nbuf*sz_stream;
mm_overhead *= 8; //overestimate..this should be fixed with
//a precise accounting of the extra memory required
+
+ EMPQAD_DEBUG cout << "sz_stream: " << sz_stream << " buf_arity: " << buf_arity <<
+ " mm_overhead: " << mm_overhead << " mm_avail: " << initMem << ".\n";
+
+
+
EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to "
<< ((float)mm_overhead / (1 << 20)) << "MB" << endl;
- if (mm_overhead > mm_avail) {
- cerr << "overhead bigger than available memory ("<< mm_avail << "); "
+ if (mm_overhead > initMem) {
+ cerr << "overhead bigger than available memory ("<< initMem << "); "
<< "increase -m and try again\n";
exit(1);
}
- mm_avail -= mm_overhead;
+ initMem -= mm_overhead;
//------------------------------------------------------------
- long pqsize = mm_avail/sizeof(T);
+ long pqsize = initMem/sizeof(T);
EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
//initialize in memory pqueue and set em to NULL
@@ -99,8 +146,6 @@
};
-
-
template<class T, class Key>
EMPQueueAdaptive<T,Key>::~EMPQueueAdaptive() {
switch(regim) {
@@ -123,7 +168,7 @@
template<class T, class Key>
long
EMPQueueAdaptive<T,Key>::maxlen() const {
- long m;
+ long m=-1;
switch(regim) {
case INMEM:
assert(im);
@@ -213,8 +258,26 @@
return v;
};
+/* switch over to using an external priority queue */
template<class T, class Key>
void
+EMPQueueAdaptive<T,Key>::clear() {
+ switch(regim) {
+ case INMEM:
+ im->clear();
+ break;
+ case EXTMEM:
+ em->clear();
+ break;
+ case EXTMEM_DEBUG:
+ dim->clear();
+ break;
+ }
+}
+
+
+template<class T, class Key>
+void
EMPQueueAdaptive<T,Key>::verify() {
switch(regim) {
case INMEM:
@@ -246,7 +309,7 @@
case EXTMEM_DEBUG:
v1 = dim->extract_all_min(tmp);
v = em->extract_all_min(elt);
- assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+ assert(dim->size() == em->size());
assert(v == v1);
assert(tmp == elt);
break;
@@ -269,7 +332,7 @@
v = em->size();
break;
case EXTMEM_DEBUG:
- v1 = dim->BasicMinMaxHeap<T>::size();
+ v1 = dim->size();
v = em->size();
assert(v == v1);
break;
@@ -300,7 +363,7 @@
v = em->extract_min(elt);
assert(v == v1);
assert(tmp == elt);
- assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+ assert(dim->size() == em->size());
break;
}
return v;
@@ -334,7 +397,7 @@
case EXTMEM_DEBUG:
dim->insert(elt);
v = em->insert(elt);
- assert(dim->BasicMinMaxHeap<T>::size() == em->size());
+ assert(dim->size() == em->size());
break;
}
return v;
Modified: grass/trunk/include/iostream/empq_impl.h
===================================================================
--- grass/trunk/include/iostream/empq_impl.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/empq_impl.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -19,20 +20,11 @@
#ifndef __EMPQ_IMPL_H
#define __EMPQ_IMPL_H
-#include <stdio.h>
-
-#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
#include <ostream>
-#else
-#include <ostream.h>
-#endif
-
using namespace std;
#include "empq.h"
-
-
#if(0)
#include "option.H"
#define MY_LOG_DEBUG_ID(x) \
@@ -136,10 +128,12 @@
//ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
AMI_err ae;
size_t mm_avail = getAvailableMemory();
- printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
- mm_avail/(float)(1<<20));
- printf("EM_PQUEUE:available memory before allocation: %ldB\n",
- mm_avail);
+ printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
+ mm_avail/(float)(1<<20));
+ printf("EM_PQUEUE:available memory before allocation: %ldB\n",
+ mm_avail);
+
+
//____________________________________________________________
//ALLOCATE STRUCTURE
//some dummy checks..
@@ -357,51 +351,59 @@
assert(im && amis);
pqcapacity = im->get_maxsize()/2; // we think this memory is now available
+ pqsize = pqcapacity + 1; //truncate errors
pqcurrentsize = im->size();
- pqsize = pqcapacity;
-
+ //assert( pqcurrentsize <= pqsize);
+ if(!(pqcurrentsize <= pqsize)) {
+ cout << "EMPQ: pq maxsize=" << pqsize <<", pq crtsize=" << pqcurrentsize
+ << "\n";
+ assert(0);
+ exit(1);
+ }
+
+
LOG_avail_memo();
-
+
/* at this point im is allocated all memory, but it is only at most
half full; we need to relocate im to half space and to allocate
buff_0 the other half; since we use new, there is no realloc, so
we will copy to a file...*/
-
+
{
- //copy im to a stream and free its memory
- T x;
- AMI_STREAM<T> tmpstr;
- for (unsigned int i=0; i<pqcurrentsize; i++) {
- im->extract_min(x);
- ae = tmpstr.write_item(x);
+ //copy im to a stream and free its memory
+ T x;
+ AMI_STREAM<T> tmpstr;
+ for (unsigned int i=0; i<pqcurrentsize; i++) {
+ im->extract_min(x);
+ ae = tmpstr.write_item(x);
+ assert(ae == AMI_ERROR_NO_ERROR);
+ }
+ delete im; im = NULL;
+ LOG_avail_memo();
+
+ //allocate pq and buff_0 half size
+ bufsize = pqcapacity;
+ cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
+ << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n";
+ cout.flush();
+ buff_0 = new im_buffer<T>(bufsize);
+ assert(buff_0);
+ cout << "EM_PQUEUE: allocating pq size=" << pqsize
+ << " total " << (float)pqcapacity*sizeof(T)/(1<<20) << "MB\n";
+ cout.flush();
+ pq = new MinMaxHeap<T>(pqsize);
+ assert(pq);
+
+ //fill pq from tmp stream
+ ae = tmpstr.seek(0);
assert(ae == AMI_ERROR_NO_ERROR);
- }
- delete im;
- LOG_avail_memo();
-
- //allocate pq and buff_0 half size
- bufsize = pqcapacity;
- cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
- << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n";
- cout.flush();
- buff_0 = new im_buffer<T>(bufsize);
- assert(buff_0);
- cout << "EM_PQUEUE: allocating pq size=" << pqcapacity
- << " total " << (float)pqcapacity*sizeof(T)/(1<<20) << "MB\n";
- cout.flush();
- pq = new MinMaxHeap<T>(pqcapacity);
- assert(pq);
-
- //fill pq from tmp stream
- ae = tmpstr.seek(0);
- assert(ae == AMI_ERROR_NO_ERROR);
- T *elt;
- for (unsigned int i=0; i<pqcurrentsize; i++) {
- ae = tmpstr.read_item(&elt);
- assert(ae == AMI_ERROR_NO_ERROR);
- pq->insert(*elt);
- }
- assert(pq->size() == pqcurrentsize);
+ T *elt;
+ for (unsigned int i=0; i<pqcurrentsize; i++) {
+ ae = tmpstr.read_item(&elt);
+ assert(ae == AMI_ERROR_NO_ERROR);
+ pq->insert(*elt);
+ }
+ assert(pq->size() == pqcurrentsize);
}
//estimate buf_arity
@@ -430,6 +432,11 @@
buf_arity = 1;
}
+ //added on 05/16/2005 by Laura
+ if (buf_arity > MAX_STREAMS_OPEN) {
+ buf_arity = MAX_STREAMS_OPEN;
+ }
+
//allocate ext memory buffer array
char str[200];
sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
@@ -475,9 +482,13 @@
template<class T, class Key>
em_pqueue<T,Key>::~em_pqueue() {
//delete in memory pqueue
- if (pq) delete pq;
+ if (pq) {
+ delete pq; pq = NULL;
+ }
//delete in memory buffer
- if (buff_0) delete buff_0;
+ if (buff_0) {
+ delete buff_0; buff_0 = NULL;
+ }
//delete ext memory buffers
for (unsigned short i=0; i< crt_buf; i++) {
if (buff[i]) delete buff[i];
@@ -582,10 +593,9 @@
}
//merge pqsize smallest elements from each buffer into a new stream
ExtendedMergeStream** outstreams;
- //gcc-3.4 doesn't allows (TYPE*)[SIZE] declarations
- //use TYPE*[SIZE]
- outstreams = new ExtendedMergeStream*[crt_buf];
+ outstreams = new ExtendedMergeStream* [crt_buf];
+ /* gets stream of smallest pqsize elts from each level */
for (unsigned short i=0; i< crt_buf; i++) {
MY_LOG_DEBUG_ID(crt_buf);
outstreams[i] = new ExtendedMergeStream();
@@ -596,6 +606,7 @@
//print_stream(outstreams[i]); cout.flush();
}
+ /* merge above streams into pqsize elts in minstream */
if (crt_buf == 1) {
//just one level; make common case faster :)
merge_bufs2pq(outstreams[0]);
@@ -607,20 +618,21 @@
//cout << "merging streams\n";
ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
assert(ae == AMI_ERROR_NO_ERROR);
- for (unsigned short i=0; i< crt_buf; i++) {
+ for (int i=0; i< crt_buf; i++) {
delete outstreams[i];
}
delete [] outstreams;
-
+
//copy the minstream in the internal pqueue while merging with buff_0;
//the smallest <pqsize> elements between minstream and buff_0 will be
//inserted in internal pqueue;
//also, the elements from minstram which are inserted in pqueue must be
//marked as deleted in the source streams;
merge_bufs2pq(minstream);
- delete minstream;
+ delete minstream; minstream = NULL;
//cout << "after merge_bufs2pq: \n" << *this << "\n";
}
+
XXX assert(pq->size());
XXX cerr << "fillpq done" << endl;
return true;
@@ -969,7 +981,7 @@
#endif
T val = x;
- MY_LOG_DEBUG_ID("\n\n\nEM_PQUEUE::INSERT");
+ MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
//if structure is empty insert x in pq; not worth the trouble..
if ((crt_buf == 0) && (buff_0->is_empty())) {
if (!pq->full()) {
@@ -1244,6 +1256,7 @@
assert(instreams[i]);
//rewind stream
if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
+ cerr << "WARNING!!! EARLY EXIT!!!" << endl;
return ami_err;
}
/* read first item */
@@ -1261,6 +1274,7 @@
j++;
break;
default:
+ cerr << "WARNING!!! EARLY EXIT!!!" << endl;
return ami_err;
}
}
@@ -1283,6 +1297,7 @@
//write min item to output stream
out = ExtendedEltMergeType<T,Key>(*in_objects[i], bufid, i);
if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
+ cerr << "WARNING!!! EARLY EXIT!!!" << endl;
return ami_err;
}
//cout << "wrote " << out << "\n";
@@ -1302,6 +1317,7 @@
}
break;
default:
+ cerr << "WARNING!!! early breakout!!!" << endl;
return ami_err;
}
//cout << "PQ: " << mergeheap << "\n";
@@ -1341,68 +1357,80 @@
assert(arity> 1);
//Pointers to current leading elements of streams
- ExtendedEltMergeType<T,Key>* in_objects[arity];
+ ExtendedEltMergeType<T,Key> in_objects[arity];
+
AMI_err ami_err;
- unsigned int i;
+ //unsigned int i;
unsigned int nonEmptyRuns=0; //count number of non-empty runs
//array initialized with first element from each stream (only non-null keys
//must be included)
MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
+
merge_key<Key>* keys = new merge_key<Key> [arity];
assert(keys);
//rewind and read the first item from every stream
- for (i = 0; i < arity ; i++ ) {
+ for (int i = 0; i < arity ; i++ ) {
//rewind stream
if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
return ami_err;
}
//read first item
- ami_err = instreams[i]->read_item(&(in_objects[i]));
+ ExtendedEltMergeType<T,Key> *objp;
+ ami_err = instreams[i]->read_item(&objp);
switch(ami_err) {
case AMI_ERROR_NO_ERROR:
- keys[nonEmptyRuns] = merge_key<Key>(in_objects[i]->getPriority(), i);
+ in_objects[i] = *objp;
+ keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
nonEmptyRuns++;
break;
case AMI_ERROR_END_OF_STREAM:
- in_objects[i] = NULL;
break;
default:
return ami_err;
}
}
-
+ assert(nonEmptyRuns <= arity);
+
//build heap from the array of keys
- pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns);
+ pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns); /* takes ownership of keys */
//repeatedly extract_min from heap and insert next item from same stream
long extracted = 0;
//rewind output buffer
ami_err = outstream->seek(0);
assert(ami_err == AMI_ERROR_NO_ERROR);
+
while (!mergeheap.empty() && (extracted < K)) {
//find min key and id of stream it comes from
- i = mergeheap.min().stream_id();
+ int id = mergeheap.min().stream_id();
//write min item to output stream
- if ((ami_err = outstream->write_item(*in_objects[i]))
- != AMI_ERROR_NO_ERROR) {
+ assert(id < nonEmptyRuns);
+ assert(id >= 0);
+ assert(mergeheap.size() == nonEmptyRuns);
+ ExtendedEltMergeType<T,Key> obj = in_objects[id];
+ if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
return ami_err;
}
//cout << "wrote " << *in_objects[i] << "\n";
//extract the min from the heap and insert next key from same stream
- ami_err = instreams[i]->read_item(&(in_objects[i]));
+ assert(id < nonEmptyRuns);
+ assert(id >= 0);
+ ExtendedEltMergeType<T,Key> *objp;
+ ami_err = instreams[id]->read_item(&objp);
switch(ami_err) {
case AMI_ERROR_NO_ERROR:
{
- merge_key<Key> tmp = merge_key<Key>(in_objects[i]->getPriority(), i);
+ in_objects[id] = *objp;
+ merge_key<Key> tmp = merge_key<Key>(in_objects[id].getPriority(), id);
mergeheap.delete_min_and_insert(tmp);
}
extracted++; //update nb of extracted elements
break;
case AMI_ERROR_END_OF_STREAM:
- in_objects[i] = NULL;
+ mergeheap.delete_min();
break;
default:
return ami_err;
@@ -1415,7 +1443,6 @@
//IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
//DESTRUCTOR, AND EVERYTHING SCREWS UP..
-
MY_LOG_DEBUG_ID("merge_streams: done");
return AMI_ERROR_NO_ERROR;
}
@@ -1424,6 +1451,22 @@
//************************************************************/
template<class T, class Key>
void
+em_pqueue<T,Key>::clear() {
+ pq->clear();
+ buff_0->clear();
+
+ for(int i=0; i<crt_buf; i++) {
+ if(buff[i]) {
+ delete buff[i]; buff[i] = NULL;
+ }
+ }
+ crt_buf = 0;
+}
+
+
+//************************************************************/
+template<class T, class Key>
+void
em_pqueue<T,Key>::print_range() {
cout << "EM_PQ: [pq=" << pqsize
<< ", b=" << bufsize
Modified: grass/trunk/include/iostream/imbuffer.h
===================================================================
--- grass/trunk/include/iostream/imbuffer.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/imbuffer.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,6 +16,7 @@
*
*****************************************************************************/
+
#ifndef __IMBUFFER_H
#define __IMBUFFER_H
@@ -146,6 +147,12 @@
#endif
}
+ //reset buffer (delete all data); don't delete memory
+ void clear() {
+ size = 0;
+ sorted = false;
+ }
+
//reset buffer: keep n elements starting at position start
void reset(unsigned long start, unsigned long n);
Modified: grass/trunk/include/iostream/mem_stream.h
===================================================================
--- grass/trunk/include/iostream/mem_stream.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mem_stream.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,7 @@
+
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -84,7 +85,7 @@
template<class T>
AMI_err MEM_STREAM<T>::name(char **stream_name) {
- char const* path = "dummy";
+ const char *path = "dummy";
*stream_name = new char [strlen(path) + 1];
strcpy(*stream_name, path);
Modified: grass/trunk/include/iostream/minmaxheap.h
===================================================================
--- grass/trunk/include/iostream/minmaxheap.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/minmaxheap.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -65,16 +65,16 @@
template <class T>
class BasicMinMaxHeap {
-
protected:
HeapIndex maxsize;
HeapIndex lastindex; // last used position (0 unused) (?)
T *A;
+
+protected:
/* couple of memory mgt functions to keep things consistent */
static T *allocateHeap(HeapIndex n);
static void freeHeap(T *);
- virtual void grow()=0;
-
+
public:
BasicMinMaxHeap(HeapIndex size) : maxsize(size) {
char str[100];
@@ -93,7 +93,10 @@
};
bool empty(void) const { return size() == 0; };
- HeapIndex size() const;
+ HeapIndex size() const {
+ assert(A || !lastindex);
+ return lastindex;
+ };
T get(HeapIndex i) const { assert(i <= size()); return A[i]; }
@@ -111,8 +114,10 @@
bool extract_all_min(T& elt);
void reset();
+ void clear(); /* mark all the data as deleted, but don't do free */
void destructiveVerify();
+
void verify();
void print() const;
@@ -127,17 +132,18 @@
return s;
}
+protected:
+ virtual void grow()=0;
private:
- // Changed log2() to log2_() just in case log2() macro was already
- // defined in math.h: e.g., log2() is defined in Cygwin gcc by default.
- long log2_(long n) const;
- int isOnMaxLevel(HeapIndex i) const { return (log2_(i) % 2); };
+ long log2(long n) const;
+ int isOnMaxLevel(HeapIndex i) const { return (log2(i) % 2); };
int isOnMinLevel(HeapIndex i) const { return !isOnMaxLevel(i); };
HeapIndex leftChild(HeapIndex i) const { return 2*i; };
HeapIndex rightChild(HeapIndex i) const { return 2*i + 1; };
int hasRightChild(HeapIndex i) const { return (rightChild(i) <= size()); };
+ int hasRightChild(HeapIndex i, HeapIndex *c) const { return ((*c=rightChild(i)) <= size()); };
HeapIndex parent(HeapIndex i) const { return (i/2); };
HeapIndex grandparent(HeapIndex i) const { return (i/4); };
int hasChildren(HeapIndex i) const { return (2*i) <= size(); }; // 1 or more
@@ -164,20 +170,12 @@
// index 0 is invalid
// index <= size
-
// ----------------------------------------------------------------------
-template <class T>
-HeapIndex BasicMinMaxHeap<T>::size() const {
- assert(A || !lastindex);
- return lastindex;
-}
-// ----------------------------------------------------------------------
-
template <class T>
-long BasicMinMaxHeap<T>::log2_(long n) const {
+long BasicMinMaxHeap<T>::log2(long n) const {
long i=-1;
- // let log2_(0)==-1
+ // let log2(0)==-1
while(n) {
n = n >> 1;
i++;
@@ -262,8 +260,8 @@
// p is smallest of left child, its grandchildren
minpos = p;
- if(hasRightChild(i)) {
- p = rightChild(i);
+ if(hasRightChild(i,&p)) {
+ //p = rightChild(i);
if(hasChildren(p)) {
q = smallestChild(p);
if(A[p] > A[q]) p = q;
@@ -291,8 +289,8 @@
// p is smallest of left child, its grandchildren
maxpos = p;
- if(hasRightChild(i)) {
- p = rightChild(i);
+ if(hasRightChild(i,&p)) {
+ //p = rightChild(i);
if(hasChildren(p)) {
q = largestChild(p);
if(A[p] < A[q]) p = q;
@@ -624,7 +622,15 @@
}
// ----------------------------------------------------------------------
+
template <class T>
+void
+BasicMinMaxHeap<T>::clear() {
+ lastindex = 0;
+}
+
+// ----------------------------------------------------------------------
+template <class T>
T *
BasicMinMaxHeap<T>::allocateHeap(HeapIndex n) {
T *p;
@@ -708,17 +714,11 @@
}
-
// ----------------------------------------------------------------------
// ----------------------------------------------------------------------
template <class T>
class MinMaxHeap : public BasicMinMaxHeap<T> {
-
-//using BasicMinMaxHeap<T>::maxsize;
-//using BasicMinMaxHeap<T>::lastindex;
-//using BasicMinMaxHeap<T>::size;
-
public:
MinMaxHeap(HeapIndex size) : BasicMinMaxHeap<T>(size) {};
virtual ~MinMaxHeap() {};
@@ -727,7 +727,7 @@
HeapIndex fill(T* arr, HeapIndex n);
protected:
- virtual void grow() { assert(0); exit(1); };
+ virtual void grow() { fprintf(stderr, "MinMaxHeap::grow: not implemented\n"); assert(0); exit(1); };
};
// ----------------------------------------------------------------------
@@ -738,12 +738,12 @@
HeapIndex MinMaxHeap<T>::fill(T* arr, HeapIndex n) {
HeapIndex i;
//heap must be empty
- assert(get_maxsize()==0);
+ assert(this->size()==0);
for (i = 0; !full() && i<n; i++) {
insert(arr[i]);
}
if (i < n) {
- assert(i == get_maxsize());
+ assert(i == this->maxsize);
return n - i;
} else {
return 0;
@@ -756,13 +756,9 @@
template <class T>
class UnboundedMinMaxHeap : public BasicMinMaxHeap<T> {
-
-using BasicMinMaxHeap<T>::A;
-using BasicMinMaxHeap<T>::maxsize;
-using BasicMinMaxHeap<T>::size;
-
public:
UnboundedMinMaxHeap() : BasicMinMaxHeap<T>(MMHEAP_INITIAL_SIZE) {};
+ UnboundedMinMaxHeap(HeapIndex size) : BasicMinMaxHeap<T>(size) {};
virtual ~UnboundedMinMaxHeap() {};
protected:
virtual void grow();
@@ -770,14 +766,18 @@
template <class T>
void UnboundedMinMaxHeap<T>::grow() {
- T *old = A;
- maxsize *= 2;
+ T *old = this->A;
+ this->maxsize *= 2;
+ assert(this->maxsize > 0);
+
if(old) {
- A = allocateHeap(maxsize); /* allocate a new array */
+ HeapIndex n = this->size();
+ this->A = allocateHeap(this->maxsize); /* allocate a new array */
/* copy over the old values */
- for(int i=0; i<size()+1; i++) {
- A[i] = old[i];
+ assert(this->maxsize > n);
+ for(HeapIndex i=0; i<=n; i++) { /* why extra value? -RW */
+ this->A[i] = old[i];
}
freeHeap(old); /* free up old storage */
}
Modified: grass/trunk/include/iostream/mm.h
===================================================================
--- grass/trunk/include/iostream/mm.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mm.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,7 +16,6 @@
*
*****************************************************************************/
-
#ifndef _MM_H
#define _MM_H
@@ -83,8 +82,10 @@
// flag indicates how we are keeping track of memory
static MM_mode register_new;
-protected:
- // private methods, only called by operators new and delete.
+//protected:
+// // private methods, only called by operators new and delete.
+
+public: // Need to be accessible from pqueue constructor
MM_err register_allocation (size_t sz);
MM_err register_deallocation(size_t sz);
@@ -110,6 +111,7 @@
friend class mm_register_init;
friend void * operator new(size_t);
+ friend void * operator new[](size_t);
friend void operator delete(void *);
friend void operator delete[](void *);
};
Modified: grass/trunk/include/iostream/mm_utils.h
===================================================================
--- grass/trunk/include/iostream/mm_utils.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/mm_utils.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,14 +16,13 @@
*
*****************************************************************************/
-
#ifndef MM_UTIL_H
#define MM_UTIL_H
-
#include "mm.h"
#include <string>
+
void LOG_avail_memo();
size_t getAvailableMemory();
Modified: grass/trunk/include/iostream/pqheap.h
===================================================================
--- grass/trunk/include/iostream/pqheap.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/pqheap.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,14 +16,13 @@
*
*****************************************************************************/
-
#ifndef _PQHEAP_H
#define _PQHEAP_H
#include <assert.h>
#include <stdlib.h>
-#define PQHEAP_MEM_DEBUG 0
+#define PQHEAP_MEM_DEBUG if(0)
//HEAPSTATUS can be defined at compile time
@@ -126,9 +125,7 @@
inline unsigned int num_elts(void);
// How many elements? sorry - i could never remember num_elts
- inline unsigned int size(void) {
- return cur_elts;
- }
+ inline unsigned int size(void) const { return cur_elts; };
// Min
inline bool min(T& elt);
@@ -192,13 +189,12 @@
elements = new T [size];
cout << "pqheap_t1: register memory\n";
cout.flush();
-#if PQHEAP_MEM_DEBUG
- cout << "pqheap_t1::pq_heap_t1: allocate\n";
- MMmanager.print();
-#endif
+ PQHEAP_MEM_DEBUG cout << "pqheap_t1::pq_heap_t1: allocate\n";
+ // PQHEAP_MEM_DEBUG MMmanager.print();
+
if (!elements) {
- cout << "could not allocate priority queue: insufficient memory..\n";
+ cerr << "could not allocate priority queue: insufficient memory..\n";
exit(1);
}
assert(elements);
@@ -223,6 +219,13 @@
template <class T>
inline
pqheap_t1<T>::pqheap_t1(T* a, unsigned int size) {
+ {
+ static int flag = 0;
+ if(!flag) {
+ cerr << "Using slow build in pqheap_t1" << endl;
+ flag = 1;
+ }
+ }
elements = a;
max_elts = size;
@@ -503,6 +506,7 @@
template <class T>
inline void
pqheap_t1<T>::delete_min_and_insert(const T &x) {
+ assert(cur_elts);
elements[0] = x;
heapify(0);
}
Modified: grass/trunk/include/iostream/queue.h
===================================================================
--- grass/trunk/include/iostream/queue.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/queue.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,8 +16,6 @@
*
*****************************************************************************/
-
-
#ifndef QUEUE_H
#define QUEUE_H
@@ -48,6 +46,9 @@
template<class T>
queue<T>::queue(int vsize) : size(vsize) {
+
+ if(size <= 0) size = 64; /* default */
+
data = new T[size];
head = 0;
tail = 0;
Modified: grass/trunk/include/iostream/quicksort.h
===================================================================
--- grass/trunk/include/iostream/quicksort.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/quicksort.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -41,11 +41,13 @@
// Try to get a good partition value and avoid being bitten by already
// sorted input.
+ //ptpart = data + (random() % n);
#ifdef __MINGW32__
ptpart = data + (rand() % n);
#else
ptpart = data + (random() % n);
#endif
+
tpart = *ptpart;
*ptpart = data[0];
data[0] = tpart;
Modified: grass/trunk/include/iostream/replacementHeap.h
===================================================================
--- grass/trunk/include/iostream/replacementHeap.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/replacementHeap.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,7 +16,6 @@
*
*****************************************************************************/
-
#ifndef REPLACEMENT_QUEUE_H
#define REPLACEMENT_QUEUE_H
@@ -95,7 +94,7 @@
ReplacementHeap<T,Compare>(size_t arity, queue<char*>* runList);
//delete array mergeHeap
- ~ReplacementHeap();
+ ~ReplacementHeap<T,Compare>();
//is heap empty?
int empty() const {
@@ -131,7 +130,7 @@
/*****************************************************************/
template<class T,class Compare>
ReplacementHeap<T,Compare>::ReplacementHeap(size_t g_arity,
- queue<char*>* runList) {
+ queue<char*>* runList) {
char* name=NULL;
assert(runList && g_arity > 0);
@@ -399,4 +398,3 @@
#endif
-
Modified: grass/trunk/include/iostream/replacementHeapBlock.h
===================================================================
--- grass/trunk/include/iostream/replacementHeapBlock.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/replacementHeapBlock.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,7 +16,6 @@
*
*****************************************************************************/
-
#ifndef REPLACEMENT_HEAPBLOCK_H
#define REPLACEMENT_HEAPBLOCK_H
@@ -97,7 +96,7 @@
ReplacementHeapBlock<T,Compare>(queue <MEM_STREAM<T>*> *runList);
//delete array mergeHeap
- ~ReplacementHeapBlock();
+ ~ReplacementHeapBlock<T,Compare>();
//is heap empty?
int empty() const {
@@ -375,4 +374,3 @@
#endif
-
Modified: grass/trunk/include/iostream/rtimer.h
===================================================================
--- grass/trunk/include/iostream/rtimer.h 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/include/iostream/rtimer.h 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -78,8 +78,8 @@
perror("rusage/gettimeofday"); \
exit(1); \
}
-
+
#define rt_u_useconds(rt) \
(((double)rt.rut2.ru_utime.tv_usec + \
(double)rt.rut2.ru_utime.tv_sec*1000000) \
@@ -98,16 +98,21 @@
- ((double)rt.tv1.tv_usec + \
(double)rt.tv1.tv_sec*1000000))
+
#endif /* __MINGW32__ */
+
+
+
/* not required to be called, but makes values print as 0.
obviously a hack */
#define rt_zero(rt) bzero(&(rt),sizeof(Rtimer));
-
+
#define rt_seconds(rt) (rt_w_useconds(rt)/1000000)
#define rt_sprint(buf, rt) rt_sprint_safe(buf,rt)
char * rt_sprint_safe(char *buf, Rtimer rt);
+
#endif /* RTIMER_H */
Modified: grass/trunk/lib/iostream/ami_stream.cc
===================================================================
--- grass/trunk/lib/iostream/ami_stream.cc 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/ami_stream.cc 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,7 +16,6 @@
*
*****************************************************************************/
-
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
@@ -26,8 +25,26 @@
#include <errno.h>
#include <unistd.h>
+//#include <ami_stream.h>
#include <grass/iostream/ami_stream.h>
+
+char *ami_str_error[] = {
+ "AMI_ERROR_NO_ERROR",
+ "AMI_ERROR_IO_ERROR",
+ "AMI_ERROR_END_OF_STREAM",
+ "AMI_ERROR_OUT_OF_RANGE",
+ "AMI_ERROR_READ_ONLY",
+ "AMI_ERROR_OS_ERROR",
+ "AMI_ERROR_MM_ERROR",
+ "AMI_ERROR_OBJECT_INITIALIZATION",
+ "AMI_ERROR_PERMISSION_DENIED",
+ "AMI_ERROR_INSUFFICIENT_MAIN_MEMORY",
+ "AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS",
+ "AMI_ERROR_ENV_UNDEFINED",
+ "AMI_ERROR_NO_MAIN_MEMORY_OPERATION",
+};
+
/**********************************************************************/
/* creates a random file name, opens the file for reading and writing
and and returns a file descriptor */
@@ -39,9 +56,13 @@
// get the dir
base_dir = getenv(STREAM_TMPDIR);
- assert(base_dir);
+ if(!base_dir) {
+ fprintf(stderr, "ami_stream: %s not set\n", STREAM_TMPDIR);
+ assert(base_dir);
+ exit(1);
+ }
+ sprintf(tmp_path, "%s/%s_XXXXXX", base_dir, base.c_str());
- sprintf(tmp_path, "%s/%s_XXXXXX", base_dir, base.c_str());
#ifdef __MINGW32__
fd = mktemp(tmp_path) ? open(tmp_path, O_CREAT|O_EXCL|O_RDWR, 0600) : -1;
#else
@@ -73,17 +94,23 @@
case AMI_WRITE_STREAM:
fp = fdopen(fd, "wb");
break;
+ case AMI_APPEND_WRITE_STREAM:
+ fp = fdopen(fd, "ab");
+ break;
case AMI_APPEND_STREAM:
fp = fdopen(fd, "ab+");
break;
case AMI_READ_WRITE_STREAM:
- fp = fdopen(fd, "rb+");
- if (!fp) {
- //if file does not exist, create it
- fp = fdopen(fd, "wb+");
- }
- break;
+ fp = fdopen(fd, "rb+");
+ if (!fp) {
+ //if file does not exist, create it
+ fp = fdopen(fd, "wb+");
+ }
+ break;
}
+ if(!fp) {
+ perror("fdopen");
+ }
assert(fp);
return fp;
@@ -105,6 +132,9 @@
case AMI_WRITE_STREAM:
fp = fopen(pathname, "wb");
break;
+ case AMI_APPEND_WRITE_STREAM:
+ fp = fopen(pathname, "ab");
+ break;
case AMI_APPEND_STREAM:
fp = fopen(pathname, "ab+");
assert(fp);
@@ -121,7 +151,7 @@
break;
}
if (!fp) {
- perror("cannot open stream");
+ perror(pathname);
assert(0);
exit(1);
}
Modified: grass/trunk/lib/iostream/mm.cc
===================================================================
--- grass/trunk/lib/iostream/mm.cc 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/mm.cc 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -24,6 +24,8 @@
#include <assert.h>
#include <iostream>
using namespace std;
+
+//#include <mm.h>
#include <grass/iostream/mm.h>
#define MM_DEBUG if(0)
@@ -254,6 +256,57 @@
/* ************************************************************ */
+void* operator new[] (size_t sz) {
+ void *p;
+
+ MM_DEBUG cout << "new: sz=" << sz << ", register "
+ << sz+SIZE_SPACE << "B ,";
+
+ if (MM_manager.register_allocation (sz + SIZE_SPACE) != MM_ERROR_NO_ERROR){
+ //must be MM_ERROR_INSUF_SPACE
+ switch(MM_manager.register_new) {
+
+ case MM_ABORT_ON_MEMORY_EXCEEDED:
+ cerr << "MM error: limit ="<< MM_manager.memory_limit() <<"B. "
+ << "allocating " << sz << "B. "
+ << "limit exceeded by "
+ << MM_manager.memory_used() - MM_manager.memory_limit()<<"B."
+ << endl;
+ assert (0); // core dump if debugging
+ exit (1);
+ break;
+
+ case MM_WARN_ON_MEMORY_EXCEEDED:
+ cerr << "MM warning: limit="<<MM_manager.memory_limit() <<"B. "
+ << "allocating " << sz << "B. "
+ << " limit exceeded by "
+ << MM_manager.memory_used() - MM_manager.memory_limit()<<"B."
+ << endl;
+ break;
+
+ case MM_IGNORE_MEMORY_EXCEEDED:
+ break;
+ }
+ }
+
+ p = malloc(sz + SIZE_SPACE);
+
+ if (!p) {
+ cerr << "new: out of memory while allocating " << sz << "B" << endl;
+ assert(0);
+ exit (1);
+ }
+
+ *((size_t *) p) = sz;
+
+ MM_DEBUG cout << "ptr=" << (void*) (((char *) p) + SIZE_SPACE) << endl;
+
+ return ((char *) p) + SIZE_SPACE;
+}
+
+
+
+/* ************************************************************ */
void* operator new (size_t sz) {
void *p;
@@ -319,7 +372,8 @@
//destructor for something that was not allocated with new
//e.g. ofstream str(name) ---- ~ofstream() called ==> ptr=NULL
- //assert(0);
+ //who wrote the above comment? -RW
+ assert(0);
//exit(1);
return;
}
Modified: grass/trunk/lib/iostream/mm_utils.cc
===================================================================
--- grass/trunk/lib/iostream/mm_utils.cc 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/mm_utils.cc 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -17,18 +17,14 @@
*****************************************************************************/
#include <sys/types.h>
+#include <sys/mman.h>
#include <ctype.h>
-
-#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
#include <ostream>
-#else
-#include <ostream.h>
-#endif
-
#include <iostream>
using namespace std;
#include <stdio.h>
+//#include <mm.h>
#include <grass/iostream/mm.h>
@@ -46,7 +42,8 @@
return fmem;
}
-void MEMORY_LOG(std::string str) {
+void
+MEMORY_LOG(std::string str) {
printf("%s", str.c_str());
fflush(stdout);
}
Modified: grass/trunk/lib/iostream/rtimer.cc
===================================================================
--- grass/trunk/lib/iostream/rtimer.cc 2008-08-07 19:02:45 UTC (rev 32624)
+++ grass/trunk/lib/iostream/rtimer.cc 2008-08-07 21:13:13 UTC (rev 32625)
@@ -1,6 +1,6 @@
/****************************************************************************
*
- * MODULE: r.terraflow
+ * MODULE: iostream
*
* COPYRIGHT (C) 2007 Laura Toma
*
@@ -16,12 +16,13 @@
*
*****************************************************************************/
-
#include <sys/time.h>
+#include <sys/resource.h>
#include <stdio.h>
#include <string.h>
#include <strings.h>
+//#include <rtimer.h>
#include <grass/iostream/rtimer.h>
char *
More information about the grass-commit
mailing list