/* * supersort.c * * Copyright 2005 Robert Ricci * ----------------------------------------------------------------------------- * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * If you don't know how to find a copy of the GNU General Public License, * well, welcome to the 21st century! Or, write to the Free Software * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * ----------------------------------------------------------------------------- * * Uses a two-pass algorithm: The first pass does a Radix sort on chunks (tens * of MB) of floats from the input file, and writes the sorted chunks to the * temporary file. The second stage does a parallel merge on all of the chunks * using a heap. Obviously, we can only hold so much data to sort at a time, so * we pull from the head of all chunks in parallel, going back to disk to get * more when our data from each chunk runs dry. * * Known to compile on FreeBSD 5.x, 6.x, and Suse Linux 9 (should work on any * Linux with gcc 3.X) * * Building: * On FreeBSD: gcc -o supersort -pthread -O3 supersort.c * On Linux: gcc -o supersort -lpthread -O3 supersort.c * * See options below */ static const char rcsid[] = "$Id: supersort.c,v 1.16 2005/11/28 07:38:38 ricci Exp $"; #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __linux__ #include #endif /* * Memory layouts: * (NOTE: drawn upside down) * * ##### Stage 1: * CHUNK_SIZE = SCRATCH_SIZE / (N_PROCESSORS + 1) * scratch_bottom = * --------------------------------- * CHUNK_SIZE * --------------------------------- * repeat (N_PROCESSORS + 1) times * * stage 2: * scratch_bottom = * entries = * --------------------------------- * n_chunks * sizeof(struct entry) * --------------------------------- * heap = * --------------------------------- * n_chunks * sizeof(struct entry *) * --------------------------------- * write_buffers[x].buffer = * --------------------------------- * OUTBUF_SIZE * --------------------------------- * OUTBUF_SIZE * --------------------------------- * buffers = * --------------------------------- * leftover / n_chunks * --------------------------------- */ /* * Options controlling sort behavior (all sizes are in bytes) */ /* * Number of processors this sort will be run on */ #define N_PROCESSORS 2 /* * Pick a sort - only define one of these */ /* My own implementation of Radix Sort */ #define RADSORT /* The C library's implementation of quicksort */ /* #define LIB_QSORT */ /* * Use the flipping trick to handle negative floats in the radix sort */ #define FLOATFLIP /* * Round sizes of data structures to disk block sizes */ #define ROUND_SIZES /* * Use integer instructions for FP compares */ // #define INTEGER_FP /* * How many sorting threads to run (note, we are disk bound, so making this * higher than 1 doesn't really help) */ #define N_SORT_THREADS N_PROCESSORS /* * Do we try to optimize disk I/O by doing transfers that are the size of the * filesystem blocksize? */ #define FS_BLOCKSIZE_IO /* * Amount of memory we malloc() on the heap for buffering, etc. */ #define SCRATCH_SIZE (64 << 20) // 64 MByte /* * Size and number of buffers to use for write buffering in the second stage */ #define N_WRITE_BUFFERS 2 #define STAGE2_OUTBUF_SIZE (1048576/2) /* * Do sync loads in stage 2 */ /* #define STAGE2_LOAD_SYNC */ /* * Name of our temporary file */ #define TMPFILE "supersort.tmp" /* * Turn on various informational messages, as well as some timing code */ #define DEVELOPING /* * Turn on gobs of debugging output */ /* #define DEBUGGING */ /* * End of options */ /* * We have one extra chunk buffer so that each CPU can be sorting while the * disk is being read. */ #define N_CHUNKBUFFERS (N_PROCESSORS + 1) #ifndef INTEGER_FP #define FLOAT float #define FLOAT_LT(a,b) ((a) < (b)) #define FLOAT_EQ(a,b) ((a) == (b)) #define RADCAST (uint32_t*) #else /* * No floating point - use the formulas for doing FP using integer instructions * from Hacker's Delight */ #define FLOAT uint32_t #define FLOAT_LT(a,b) ((((int32_t)(a) >= 0) & ((int32_t)(a) < (int32_t)(b))) | (((int32_t)(a) < 0) & ((a) > (b)))) #define FLOAT_EQ(a,b) ((a) == (b)) #define RADCAST #endif /* * Bottom and top of the scratch buffer */ void *scratch_bottom; void *scratch_top; /* * If we're doing a radix sort, we have to cut the chunk size in half, because * it's not an inplace sort. */ #ifdef RADSORT uint32_t CHUNK_SIZE = ((SCRATCH_SIZE / (N_CHUNKBUFFERS)) / 2); #else uint32_t CHUNK_SIZE = (SCRATCH_SIZE / (N_CHUNKBUFFERS)); #endif /* * Information about the chunk buffers for stage 1, used by the sort threads * and the disk thread to communicate. */ typedef enum { CHUNK_EMPTY, CHUNK_FULL, CHUNK_SORTED, CHUNK_INUSE } chunk_state_t; struct chunk_info_t { int number; int size; chunk_state_t state; FLOAT *read_buffer; FLOAT *write_buffer; }; int chunks_to_read, chunks_to_write, chunks_to_sort, chunks_inuse; struct chunk_info_t chunk_info[N_CHUNKBUFFERS]; /* Used to lock the chunk metadata */ pthread_mutex_t chunk_mutex; /* Signaled when a read into a chunk buffer is done */ pthread_cond_t chunk_read_cond; /* Signaled when a chunk buffer has been sorted */ pthread_cond_t chunk_sorted_cond; /* * Information about the write buffers for stage 2 - used by the sorting * and disk threads to communicate. */ typedef enum { BUFFER_EMPTY, BUFFER_FULL, BUFFER_INUSE } buffer_state_t; struct buffer_info_t { FLOAT *buffer; int size; buffer_state_t state; off_t offset; }; struct buffer_info_t write_buffers[N_WRITE_BUFFERS]; /* Used to lock the buffer metadata */ pthread_mutex_t buffer_mutex; /* Used to singal that a write buffer flush is being requested */ pthread_cond_t buffer_io_cond; /* Used to singal that a write buffer flush is finished */ pthread_cond_t buffer_io_done_cond; /* * Argments for the disk stages - necessary, because pthreads only allows you * to pass a single argument pointer to forked threads. */ struct disk_stage1_args { int infd; int tmpfd; }; struct disk_stage2_args { int tmpfd; int outfd; }; /* * 'bools' used to tell threads when a stage is done */ int stage1_done; int state2_done; /* Now calculated dynamically below */ uint32_t stack_size = 0; /* * Convenience macro, to get the top element of any element in the heap */ #define HEAP(x) (*(heap[(x)]->stack_current)) /* * Output Macros */ #ifdef DEBUGGING #define DEBUG(format, ...) fprintf(stderr,format, ## __VA_ARGS__) #else #define DEBUG(...) #endif #ifdef DEVELOPING #define INFO(format, ...) fprintf(stderr,format, ## __VA_ARGS__) #else #define INFO(format, ...) #endif /* * Entry on the heap - must be defined here because many things below reference * it. */ struct heap_entry { FLOAT *stack_top; FLOAT *stack_current; uint32_t chunk_number; uint32_t remaining; uint32_t end_of_chunk; off_t next_start_pos; }; /* * Forward declarations of helper functions below. * Whew - that's a lot! */ inline void read_data(int, FLOAT*, uint32_t, int, off_t); inline void write_data(int, FLOAT*, uint32_t, int, off_t); int round_down(int,int); void stage1(int,int); void *disk_stage1(void *); void stage2(int,int); void *disk_stage2(void *); void *sort_stage1(void *); void load_stack(int, struct heap_entry*); void load_stacks(int, struct heap_entry *, FLOAT *); void min_heapify_down(struct heap_entry**, const uint32_t, const uint32_t); void heap_pop(struct heap_entry**, uint32_t); void heap_insert(struct heap_entry**, uint32_t, struct heap_entry*); #ifdef RADSORT uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count); #endif off_t file_size; int main(int argc, char **argv) { int infd, outfd, tmpfd; #ifdef DEVELOPING struct timeval stage1_start,stage1_stop,stage2_start,stage2_stop, stage1_total, stage2_total, total; float stage1_rate, stage2_rate, total_rate; #endif if (argc != 3) { fprintf(stderr,"usage: %s \n",argv[0]); exit(-1); } /* * Open up our files */ /* * TODO: Open any of these with O_DIRECT? (portable?) */ if ((infd = open(argv[1],O_RDONLY)) < 1) { fprintf(stderr,"Error opening input file %s\n",argv[1]); /* strerror(errno)); Linux does not like this */ exit(-1); } if ((outfd = open(argv[2],O_WRONLY|O_CREAT|O_TRUNC,S_IWUSR | S_IRUSR)) < 1) { fprintf(stderr,"Error opening output file %s: %s\n",argv[2], strerror(errno)); exit(-1); } if ((tmpfd = open(TMPFILE,O_RDWR|O_CREAT|O_TRUNC, S_IWUSR | S_IRUSR)) < 1) { fprintf(stderr,"Error opening temp file %s: %s\n",TMPFILE, strerror(errno)); exit(-1); } /* * Thanks to UNIX semantics, this file will not really go away until we * close it. This way, if we crash, it definitely gets removed. */ #ifndef DEVELOPING unlink(TMPFILE); #endif /* * Grab some space on the heap to work in * No malloc() allowed after this point! * Note - pthreads uses malloc, so this option is deprecated */ #ifdef ALLOC_SBRK scratch_bottom = sbrk(0); if (sbrk(SCRATCH_SIZE) == (void*)-1) { perror("Unable to allocate scratch memory:"); exit(-1); } scratch_top = sbrk(0); #else scratch_bottom = malloc(SCRATCH_SIZE); if (!scratch_bottom) { perror("Unable to allocate scratch memory:"); exit(-1); } scratch_top = scratch_bottom + SCRATCH_SIZE; #endif #ifdef DEVELOPING gettimeofday(&stage1_start,NULL); #endif /* * Run the first stage! */ stage1(infd, tmpfd); #ifdef DEVELOPING gettimeofday(&stage1_stop,NULL); #endif INFO("Switching stages\n"); #ifdef DEVELOPING gettimeofday(&stage2_start,NULL); #endif /* * Run the second stage. */ stage2(tmpfd, outfd); #ifdef DEVELOPING gettimeofday(&stage2_stop,NULL); #endif close(infd); close(tmpfd); close(outfd); /* * Print out a lot of fun timings */ #ifdef DEVELOPING timersub(&stage1_stop,&stage1_start,&stage1_total); timersub(&stage2_stop,&stage2_start,&stage2_total); timersub(&stage2_stop,&stage1_start,&total); stage1_rate = (file_size / 1024 / 1024) / (stage1_total.tv_sec + stage1_total.tv_usec/1000000.0); stage2_rate = (file_size / 1024 / 1024) / (stage2_total.tv_sec + stage2_total.tv_usec/1000000.0); total_rate = (file_size / 1024 / 1024) / (total.tv_sec + total.tv_usec/1000000.0); INFO("Stage 1: %li.%06li s (%f MB/sec)\n", stage1_total.tv_sec, stage1_total.tv_usec, stage1_rate); INFO("Stage 2: %li.%06li s (%f MB/sec)\n", stage2_total.tv_sec, stage2_total.tv_usec, stage2_rate); INFO("Total : %li.%06li s (%f MB/sec)\n", total.tv_sec, total.tv_usec, total_rate); #endif exit(0); } /* * Passed as a comparator to the C library's qsort() function */ #ifndef RADSORT inline int float_comparator(const FLOAT *a, const FLOAT *b) { if (FLOAT_LT(*a,*b)) { return -1; } if (FLOAT_EQ(*a,*b)) { return 0; } return 1; } #endif /* * Number of chunks in the file */ uint32_t n_chunks; /* * Size of the last chunk, since it might be smaller than the others. */ off_t last_chunk_size; /* * Size we decide on for disk IO */ size_t io_blocksize; /* ------------------------------------------------------------------------- * Stage 1 * ------------------------------------------------------------------------- */ /* * First stage: Sort chunks of the input file into the output file */ void stage1(int infd, int tmpfd) { struct stat sb; int i; pthread_t disk_thread; struct disk_stage1_args args; pthread_t sort_threads[N_SORT_THREADS]; /* * Find out how many chunks there are in the file and the optimal IO * blocksize */ fstat(infd,&sb); file_size = sb.st_size; io_blocksize = sb.st_blksize; INFO("Using I/O blocksize %i\n",io_blocksize); #ifdef ROUND_SIZES CHUNK_SIZE = round_down(CHUNK_SIZE,io_blocksize); #endif n_chunks = file_size / CHUNK_SIZE; last_chunk_size = file_size % CHUNK_SIZE; if (last_chunk_size == 0) { last_chunk_size = CHUNK_SIZE; } else { n_chunks++; } INFO("Using %i chunks of size %i\n",n_chunks,CHUNK_SIZE); INFO("Last chunk size %li\n",(unsigned long int)last_chunk_size); /* * Set up threading state */ for (i = 0; i < N_CHUNKBUFFERS; i++) { chunk_info[i].state = CHUNK_EMPTY; #ifdef RADSORT chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE * 2; #else chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE; chunk_info[i].write_buffer = scratch_bottom + i * CHUNK_SIZE; #endif } pthread_mutex_init(&chunk_mutex,NULL); pthread_cond_init(&chunk_read_cond,NULL); pthread_cond_init(&chunk_sorted_cond,NULL); chunks_to_read = chunks_to_write = chunks_to_sort = 0; /* * Launch disk thread */ stage1_done = 0; args.infd = infd; args.tmpfd = tmpfd; if (pthread_create(&disk_thread,NULL,disk_stage1, &args)) { perror("Error creating disk thread"); exit(-1); } /* * Launch sorting threads */ for (i = 0; i < N_SORT_THREADS; i++) { if (pthread_create(&(sort_threads[i]),NULL,sort_stage1,NULL)) { perror("Error creating disk thread"); exit(-1); } } /* * Wait for the sorting threads to finish */ pthread_join(sort_threads[0],NULL); } /* * Stage 1 sorting routine - radix sort chunk buffers */ void *sort_stage1(void *nuthin) { int i; INFO("Sorting thread, stage 1 starting\n"); /* * Wait for the disk thread to decide we're done */ while (!stage1_done) { DEBUG("Sort thread loop\n"); /* * Wait for work to do, or for the stage to be over */ pthread_mutex_lock(&chunk_mutex); while(!chunks_to_sort && !stage1_done) { pthread_cond_wait(&chunk_read_cond,&chunk_mutex); } /* * Look for a FULL chunk, and start working on it, making it an INUSE * chunk. */ for (i = 0; i < N_CHUNKBUFFERS; i++) { if (chunk_info[i].state == CHUNK_FULL) { FLOAT* out; chunk_info[i].state = CHUNK_INUSE; chunks_to_sort--; chunks_inuse++; pthread_mutex_unlock(&chunk_mutex); /* INFO("Sorting chunk %i\n",chunk_info[i].number); */ #ifdef LIB_QSORT DEBUG("Calling qsort\n"); qsort(chunk_info[i].read_buffer,chunk_info[i].size / 4, 4,&float_comparator); #endif #ifdef RADSORT /* * Call the radix sort - since radix is not an inplace sort, * we have to let it tell us where it has put the output */ out = chunk_info[i].read_buffer + CHUNK_SIZE/4; out = (FLOAT*) rad_sort(RADCAST chunk_info[i].read_buffer, RADCAST out, chunk_info[i].size / 4); chunk_info[i].write_buffer = out; #endif /* * Let the other threads know we've finished sorting this * chunk. */ pthread_mutex_lock(&chunk_mutex); chunk_info[i].state = CHUNK_SORTED; chunks_inuse--; chunks_to_write++; pthread_cond_signal(&chunk_sorted_cond); } } pthread_mutex_unlock(&chunk_mutex); } INFO("Sorting thread, stage 1 exiting\n"); return NULL; } /* * Disk thread for stage 1 - read and write chunk buffers */ void *disk_stage1(void *_args) { int i; uint32_t current_read_chunk; struct disk_stage1_args *args = (struct disk_stage1_args*)_args; INFO("Disk thread, stage 1 starting\n"); /* * Initialize all of the chunk metadata */ chunks_to_sort = 0; chunks_to_write = 0; chunks_inuse = 0; if (n_chunks > N_CHUNKBUFFERS) { chunks_to_read = N_CHUNKBUFFERS; } else { chunks_to_read = n_chunks; } /* * Loop looking for work to do */ current_read_chunk = 0; while (chunks_to_write || chunks_to_read || chunks_to_sort || chunks_inuse) { DEBUG("Disk thread loop\n"); /* * Wait for a chunk to be ready to read or write */ pthread_mutex_lock(&chunk_mutex); while (!chunks_to_read && !chunks_to_write) { pthread_cond_wait(&chunk_sorted_cond,&chunk_mutex); } /* * Go through the chunk buffers, looking for one to do work on */ for (i = 0; i < N_CHUNKBUFFERS; i++) { switch (chunk_info[i].state) { case CHUNK_EMPTY: if (current_read_chunk < n_chunks) { /* INFO("Reading chunk %i\n",current_read_chunk); */ /* * Let other threads know we're reading this one */ chunk_info[i].state = CHUNK_INUSE; chunks_inuse++; if (current_read_chunk == (n_chunks - 1)) { chunk_info[i].size = last_chunk_size; } else { chunk_info[i].size = CHUNK_SIZE; } chunk_info[i].number = current_read_chunk++; pthread_mutex_unlock(&chunk_mutex); DEBUG("Set size to %i\n",chunk_info[i].size); /* * Do the actual read, using our helper that does * optimal size block IO */ read_data(args->infd,chunk_info[i].read_buffer, chunk_info[i].size,0,0); DEBUG("Chunk read finished\n"); /* * Let sorting threads know they can now sort this * chunk */ pthread_mutex_lock(&chunk_mutex); chunk_info[i].state = CHUNK_FULL; chunks_inuse--; chunks_to_read--; chunks_to_sort++; pthread_cond_signal(&chunk_read_cond); } else { /* * We've read the last chunk */ chunks_to_read = 0; } break; case CHUNK_SORTED: /* * Let other threads know we're writing this chunk */ chunk_info[i].state = CHUNK_INUSE; chunks_inuse++; pthread_mutex_unlock(&chunk_mutex); INFO("Writing chunk %i (stage1 %.2f%% done)\n", chunk_info[i].number, chunk_info[i].number*100.0 / n_chunks); /* * Do the actual write */ write_data(args->tmpfd,chunk_info[i].write_buffer, chunk_info[i].size,1, CHUNK_SIZE * chunk_info[i].number); /* * We'll find out on the next pass through the loop that * this chunk is ready to read */ pthread_mutex_lock(&chunk_mutex); chunks_to_write--; chunks_to_read++; chunks_inuse--; chunk_info[i].state = CHUNK_EMPTY; break; default: ; /* Nothing to do */ } } pthread_mutex_unlock(&chunk_mutex); } INFO("Disk thread, stage 1 exiting\n"); /* * Flag that the stage is done, and wake up the sorter threads so they'll * notice. */ pthread_mutex_lock(&chunk_mutex); stage1_done = 1; pthread_cond_broadcast(&chunk_read_cond); pthread_mutex_unlock(&chunk_mutex); return NULL; } /* ------------------------------------------------------------------------- * Stage 2 * ------------------------------------------------------------------------- */ /* * This messy stuff is used to keep track of all of the things we're doing with * the write buffers */ int buffers_to_write; int buffers_inuse; int buffers_empty; int buffers_pending; int stacks_to_load; /* * Used to tell the disk thread we're done with this stage */ int stage2_done; /* * Place where the heap entries reside */ struct heap_entry *entries; /* * Keep track of how much we've written for informational purposes */ unsigned long total_written = 0; /* * Stage 2 - Do a parallel merge of all chunks */ void stage2(int tmpfd, int outfd) { struct heap_entry **heap; uint32_t heap_size; int i; FLOAT *stacks; int current_write_buffer; FLOAT *outbuf; pthread_t disk_thread; struct disk_stage2_args disk_args; size_t disk_offset; /* * Do the memory layout for stage 1 */ /* Stack entries go at the bottom */ entries = (struct heap_entry*)scratch_bottom; /* Leave space for n_chunks heap entries, then the heap starts */ heap = (struct heap_entry**)((struct heap_entry*)entries + n_chunks); /* Leave space for a heap of size heap_size, then start the write buffers */ write_buffers[0].buffer = (FLOAT*)((struct heap_entry**)heap + n_chunks); /* Pack in all of the write buffers */ for (i = 1; i < N_WRITE_BUFFERS; i++) { write_buffers[i].buffer = (FLOAT*)((void*)write_buffers[i-1].buffer + STAGE2_OUTBUF_SIZE); } /* Stacks start after the end of the write buffers */ stacks = (FLOAT*)((void*)write_buffers[N_WRITE_BUFFERS - 1].buffer + STAGE2_OUTBUF_SIZE); /* * Figure out what size stacks we can fit in the space allocated to them */ stack_size = (scratch_top - (void*)stacks) / n_chunks; #ifdef ROUND_SIZES stack_size = round_down(stack_size, io_blocksize); #endif INFO("%i bytes left in memory, using stacks of size %i\n", (scratch_top - (void*)stacks), stack_size); INFO("entries: %p, heap: %p, stacks: %p\n", entries,heap,stacks); /* * Read in the intial contents of all of the stacks */ heap_size = 0; load_stacks(tmpfd,entries,stacks); for (i = 0; i < n_chunks; i++) { heap_insert(heap,heap_size,entries + i); heap_size++; } /* * Set up thread state */ pthread_cond_init(&buffer_io_cond,NULL); pthread_cond_init(&buffer_io_done_cond,NULL); pthread_mutex_init(&buffer_mutex,NULL); stacks_to_load = 0; stage2_done = 0; /* * Write buffers start empty */ for (i = 0; i < N_WRITE_BUFFERS; i++) { write_buffers[i].size = 0; write_buffers[i].state = BUFFER_EMPTY; } buffers_empty = N_WRITE_BUFFERS; /* * Grab the first write buffer to write sorted output into */ current_write_buffer = 0; outbuf = write_buffers[current_write_buffer].buffer; write_buffers[current_write_buffer].state = BUFFER_INUSE; write_buffers[current_write_buffer].offset = 0; buffers_inuse = 1; buffers_empty--; /* * Start writing at the beginning of the output file */ disk_offset = 0; /* * Fork off a disk thread */ disk_args.tmpfd = tmpfd; disk_args.outfd = outfd; if (pthread_create(&disk_thread,NULL,disk_stage2,&disk_args)) { perror("Creating disk thread"); exit(-1); } /* * Keep going until we've emptied the heap */ while (heap_size > 0) { DEBUG("Heap size: %i\n",heap_size); /* * If we've filled up an output buffer, tell the disk thread to * flush it, and grab another write buffer */ if (write_buffers[current_write_buffer].size == STAGE2_OUTBUF_SIZE) { DEBUG("Flushing %i to disk - %i\n",current_write_buffer, write_buffers[current_write_buffer].size); /* * Tell the disk thread we're ready to flush */ pthread_mutex_lock(&buffer_mutex); write_buffers[current_write_buffer].state = BUFFER_FULL; buffers_inuse--; buffers_to_write++; pthread_cond_signal(&buffer_io_cond); disk_offset += write_buffers[current_write_buffer].size; /* Unlock so that the disk thread gets to run */ pthread_mutex_unlock(&buffer_mutex); /* * Find a new write buffer to use */ pthread_mutex_lock(&buffer_mutex); /* Maybe we've gotten ahead of the disk thread */ while (!buffers_empty) { /* INFO("Waiting for a free buffer\n"); */ pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex); } /* * Find a new write buffer to use */ for (i = 0; i < N_WRITE_BUFFERS; i++) { if (write_buffers[i].state == BUFFER_EMPTY) { write_buffers[i].state = BUFFER_INUSE; write_buffers[i].size = 0; write_buffers[i].offset = disk_offset; buffers_empty--; buffers_inuse++; current_write_buffer = i; break; } } DEBUG("Picked buffer %i\n",current_write_buffer); pthread_mutex_unlock(&buffer_mutex); outbuf = write_buffers[current_write_buffer].buffer; } DEBUG("heap: %p, cs: %p\n",heap,outbuf); /* * Put the top of the heap into the output buffer */ *outbuf = HEAP(0); (write_buffers[current_write_buffer].size) += 4; outbuf++; /* * Pop the top stack in the heap */ (heap[0]->stack_current)++; heap[0]->remaining--; DEBUG("remaining: %i\n",heap[0]->remaining); /* * This stack is empty - decide what to do with it */ if (heap[0]->remaining == 0) { /* * If this stack was the last in its chunk, just pop it off the * stack */ if (heap[0]->end_of_chunk) { DEBUG("popping\n"); heap_pop(heap,heap_size); heap_size--; } else { /* * Otherwise, we have to reload the stack */ DEBUG("load\n"); #ifdef STAGE2_LOAD_SYNC load_stack(tmpfd,heap[0]); #else /* * Tell the disk tread we've got a stack for it to read */ pthread_mutex_lock(&buffer_mutex); stacks_to_load++; pthread_cond_signal(&buffer_io_cond); /* * Wait for it to finish */ while (!heap[0]->remaining) { pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex); } pthread_mutex_unlock(&buffer_mutex); #endif /* * Re-heapify, since the stack has been reloaded */ min_heapify_down(heap,heap_size,0); } } else { /* if (heap[0]->remaining == 0) */ /* * Re-heapify in case the pop made this stack no longer the * smallest. */ min_heapify_down(heap,heap_size,0); } } /* while (heap_size > 0) */ /* * Okay, we're done merging, but we have to tell the disk thread to * flush the last write buffer we used */ INFO("Flushing to disk\n"); pthread_mutex_lock(&buffer_mutex); write_buffers[current_write_buffer].state = BUFFER_FULL; buffers_inuse--; buffers_to_write++; pthread_cond_signal(&buffer_io_cond); stage2_done = 1; /* * Unlock so that the disk thread gets to run */ pthread_mutex_unlock(&buffer_mutex); /* * Wait for the disk thread to finish */ pthread_join(disk_thread,NULL); } /* * Disk thread for stage 2 - write out write buffers and read in stacks */ void *disk_stage2(void *_args) { struct disk_stage2_args *args = (struct disk_stage2_args*)_args; int i; INFO("Disk thread, stage 2 starting\n"); /* * As long as there's some work to do, stay in this loop */ while (!stage2_done || buffers_pending || buffers_to_write) { DEBUG("Disk thread here: %i %i %i %i\n",stage2_done,buffers_pending, buffers_to_write,stacks_to_load); pthread_mutex_lock(&buffer_mutex); /* * Wait for actual work to do */ while (!buffers_to_write && !stacks_to_load) { DEBUG("Disk thread waiting\n"); pthread_cond_wait(&buffer_io_cond,&buffer_mutex); } /* * Look for buffers to write */ for (i = 0; i < N_WRITE_BUFFERS; i++) { if (write_buffers[i].state == BUFFER_FULL) { /* * Mark it as ours */ write_buffers[i].state = BUFFER_INUSE; buffers_to_write--; buffers_pending++; pthread_mutex_unlock(&buffer_mutex); DEBUG("Writing data in buffer %i (size %i)\n",i, write_buffers[i].size); /* * Do the actual write */ write_data(args->outfd,write_buffers[i].buffer, write_buffers[i].size,1,write_buffers[i].offset); total_written += write_buffers[i].size; #ifdef DEVELOPING /* * Every once in a while, let the user know how we're doing */ if (total_written % (STAGE2_OUTBUF_SIZE * 10) == 0) { INFO("Sorted bytes written: %lu (stage2 %.2f%% done)\n", total_written, total_written * 100.0 / file_size); } #endif DEBUG("Write done\n"); /* * Toss this buffer back to the sorting thread */ pthread_mutex_lock(&buffer_mutex); buffers_pending--; buffers_empty++; write_buffers[i].state = BUFFER_EMPTY; pthread_cond_signal(&buffer_io_done_cond); } } pthread_mutex_unlock(&buffer_mutex); #ifndef STAGE2_LOAD_SYNC /* * Looks for stacks that need reloading - this is a slow way to do * this, but I don't want to worry about a more complicated data * structure right now. */ for (i = 0; i < n_chunks; i++) { /* if (entries[i].remaining == 0) { */ if (entries[i].remaining <= 0) { /* * Load the stack */ load_stack(args->tmpfd,&(entries[i])); /* * Tell the sorting thread we're done */ pthread_mutex_lock(&buffer_mutex); stacks_to_load--; pthread_mutex_unlock(&buffer_mutex); pthread_cond_signal(&buffer_io_done_cond); } } #endif } INFO("Disk thread, stage 2 exiting\n"); return NULL; } /* * Initialize the metadata for each stack, and load it from disk */ void load_stacks(int tmpfd, struct heap_entry *entries, FLOAT *stacks) { int i; FLOAT *stack_start; for (i = 0; i < n_chunks; i++) { entries[i].chunk_number = i; stack_start = (FLOAT*)((void*)stacks + stack_size * i); entries[i].stack_top = entries[i].stack_current = stack_start; entries[i].next_start_pos = 0; entries[i].end_of_chunk = 0; load_stack(tmpfd, entries + i); } } /* * Load a single stack */ void load_stack(int tmpfd, struct heap_entry *entry) { size_t size; size_t chunk_size; size_t remaining; DEBUG("Loading stack from chunk %i\n",entry->chunk_number); /* * Figure out how much of the chunk there is left to read */ if (entry->chunk_number == (n_chunks - 1)) { chunk_size = last_chunk_size; } else { chunk_size = CHUNK_SIZE; } size = stack_size; remaining = chunk_size - (entry->next_start_pos * 4); DEBUG("fbr: %i, cs: %i, nsp: %li\n",remaining, chunk_size, (unsigned long int)entry->next_start_pos); /* * If we're reading the end of the chunk, mark this in the header */ if (remaining <= stack_size) { size = remaining; entry->end_of_chunk = 1; DEBUG("End of chunk\n"); } /* * Do the read */ read_data(tmpfd,entry->stack_top,size,1,(CHUNK_SIZE * entry->chunk_number) + (entry->next_start_pos * 4)); /* Convert from bytes to floats */ entry->remaining = (size >> 2); DEBUG("set remaining to %i\n",entry->remaining); /* * Remember the next place to start reading from */ entry->next_start_pos = entry->next_start_pos + (size >> 2); /* * Start the stack at the top */ entry->stack_current = entry->stack_top; } /* ------------------------------------------------------------------------- * Heap implementation * ------------------------------------------------------------------------- */ /* * Convenience macros */ #define PARENT(x) ((x-1) >> 1) #define LCHILD(x) (((x) << 1) + 1) #define RCHILD(x) (((x) << 1) + 2) /* * From a given starting position, bubble the heap entry up to its propper * location */ void min_heapify_up(struct heap_entry **heap, const uint32_t start) { int i = start; struct heap_entry *parent; struct heap_entry *this; this = heap[i]; parent = heap[PARENT(i)]; /* * Loop until we're the top element, or the parent element is * small (as its suppsed to be) */ while ((i > 0) && FLOAT_LT(*(this->stack_current),*(parent->stack_current))) { /* * Exchange with parent */ heap[PARENT(i)] = this; heap[i] = parent; i = PARENT(i); this = heap[i]; if (i > 0) { parent = heap[PARENT(i)]; } } } /* * From a given starting position, sink the heap entry down to its propper * location */ void min_heapify_down(struct heap_entry **heap, const uint32_t size, const uint32_t start) { int i = start; register uint32_t lchild, rchild, smallest; register struct heap_entry *tmp; /* * More complicated exit condition */ while (1) { lchild = LCHILD(i); rchild = RCHILD(i); /* * Figure out which of the three (us, and our children) is the smallest */ smallest = i; if ((lchild < size) && (HEAP(lchild) < HEAP(i))) { smallest = lchild; } if ((rchild < size) && (HEAP(rchild) < HEAP(smallest))) { smallest = rchild; } /* * If it's not us, swap with the smallest - otherwise, we're done */ if (smallest != i) { tmp = heap[i]; heap[i] = heap[smallest]; heap[smallest] = tmp; i = smallest; } else { break; } } } /* * Pop an element off the heap - caller must decrement heap size */ void heap_pop(struct heap_entry **heap, uint32_t size) { DEBUG("Moving heap element at %i\n",size -1); DEBUG("Remaining is %i\n",heap[size -1]->remaining); heap[0] = heap[size -1]; min_heapify_down(heap,size -1,0); } /* * Insert an element into the heap - caller must increment heap size */ void heap_insert(struct heap_entry **heap, uint32_t size, struct heap_entry *item) { heap[size] = item; min_heapify_up(heap,size); } /* ------------------------------------------------------------------------- * Radix sort implementation * ------------------------------------------------------------------------- */ #define RADIX_SIZE 8 #define RADIX_BUCKETS (256) #define RADIX_PASSES 4 /* * For larger radix - broken */ /* #define RADIX_SIZE 11 #define RADIX_BUCKETS (256 << 3) #define RADIX_PASSES 3 */ uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count) { uint32_t buckets[RADIX_BUCKETS]; uint32_t offsets[RADIX_BUCKETS]; uint32_t bitmask; int i; int pass; DEBUG("rad_sort in = %p, out = %p\n",in, out); /* * Flip with Michael Herf's algorithm */ #ifdef FLOATFLIP for (i = 0; i <= count; i++) { in[i] = in[i] ^ (-(int32_t)(in[i] >> 31) | 0x80000000); } #endif /* * Go through the passes */ for (pass = 0; pass < RADIX_PASSES; pass++) { bitmask = 0; for (i = 0; i < RADIX_SIZE; i++) { bitmask = (bitmask << 1) | 0x01; } bitmask = bitmask << (pass * RADIX_SIZE); DEBUG("Bitmask for pass %i is 0x%08x\n", pass, bitmask); /* * Clear out the buckets */ bzero(buckets,RADIX_BUCKETS * sizeof(uint32_t)); bzero(offsets,RADIX_BUCKETS * sizeof(uint32_t)); /* * First stage, fill the buckets */ for (i = 0; i < count; i++) { uint32_t tmp = in[i]; unsigned char rad = (tmp & bitmask) >> (pass * RADIX_SIZE); buckets[rad]++; } /* * Second stage, make the offset table */ for (i = 1; i < RADIX_BUCKETS; i++) { offsets[i] = offsets[i - 1] + buckets[i - 1]; } /* * Finally, copy into the output buffer */ for (i = 0; i < count; i++) { unsigned char rad; rad = ((uint32_t)in[i] & bitmask) >> (pass * RADIX_SIZE); out[offsets[rad]++] = in[i]; } /* * Flip the input and output buffers for the next pass */ uint32_t* tmp = out; out = in; in = tmp; } /* * Flip back */ #ifdef FLOATFLIP for (i = 0; i <= count; i++) { in[i] = in[i] ^ (((in[i] >> 31) - 1) | 0x80000000); } #endif return in; } /* ------------------------------------------------------------------------- * Disk IO helper functions * ------------------------------------------------------------------------- */ /* * Read data into a buffer, using the optimal blocksize */ inline void read_data(int infd, FLOAT *buffer, uint32_t length, int seek, off_t file_offset) { int i; /* * Seek if they asked us to */ if (seek != 0) { if (lseek(infd,file_offset,SEEK_SET) == -1) { perror("Error seeking in input file"); exit(-1); } } #ifdef FS_BLOCKSIZE_IO /* * Load in all blocks we can load at full blocksize */ for (i = 0; i < (length / io_blocksize); i++) { read(infd,buffer,io_blocksize); buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize); } /* * Load the last block */ read(infd, buffer, length % io_blocksize); #else /* * Just slurp the whole thing in one read() */ read(infd,buffer,length); #endif } /* * Read data into a buffer, using the optimal blocksize */ inline void write_data(int outfd, FLOAT *buffer, uint32_t length, int seek, off_t file_offset) { int i; /* * Seek if they asked us to */ if (seek != 0) { DEBUG("Seeking to %li\n", (unsigned long int)file_offset); if (lseek(outfd,file_offset,SEEK_SET) == -1) { perror("Error seeking in output file"); exit(-1); } } #ifdef FS_BLOCKSIZE_IO /* * Write all blocks we can load at full blocksize */ for (i = 0; i < (length / io_blocksize); i++) { write(outfd,buffer,io_blocksize); buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize); } /* * Write the last block */ write(outfd, buffer, length % io_blocksize); #else /* * Just write the whole thing in one write() */ write(outfd,buffer,length); #endif } /* ------------------------------------------------------------------------- * Other helpers * ------------------------------------------------------------------------- */ /* * Round a to the nearest b - round down, but never to 0 */ int round_down(int a, int b) { int rounded; rounded = (a / b) * b; if (rounded > 0) { return rounded; } else { return a; } }