/*
 * supersort.c
 *
 * Copyright 2005 Robert Ricci
 * -----------------------------------------------------------------------------
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  If you don't know how to find a copy of the GNU General Public License,
 *  well, welcome to the 21st century! Or, write to the Free Software
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 * -----------------------------------------------------------------------------
 *
 * Uses a two-pass algorithm: The first pass does a Radix sort on chunks (tens
 * of MB) of floats from the input file, and writes the sorted chunks to the
 * temporary file. The second stage does a parallel merge on all of the chunks
 * using a heap. Obviously, we can only hold so much data to sort at a time, so
 * we pull from the head of all chunks in parallel, going back to disk to get
 * more when our data from each chunk runs dry.
 *
 * Known to compile on FreeBSD 5.x, 6.x, and Suse Linux 9 (should work on any
 * Linux with gcc 3.X)
 *
 * Building:
 *   On FreeBSD: gcc -o supersort -pthread -O3 supersort.c
 *   On Linux: gcc -o supersort -lpthread -O3 supersort.c
 *
 * See options below
 */
static const char rcsid[] = "$Id: supersort.c,v 1.16 2005/11/28 07:38:38 ricci Exp $";

#include <stdlib.h> 
#include <string.h> 
#include <stdio.h> 
#include <sys/types.h> 
#include <unistd.h> 
#include <fcntl.h> 
#include <errno.h> 
#include <sys/stat.h> 
#include <sys/uio.h> 
#include <assert.h> 
#include <pthread.h> 
#include <sys/time.h> 

#ifdef __linux__ 
#include <stdint.h> 
#endif 

/*
 * Memory layouts:
 * (NOTE: drawn upside down)
 *
 * ##### Stage 1:
 * CHUNK_SIZE = SCRATCH_SIZE / (N_PROCESSORS + 1)
 * scratch_bottom =
 * ---------------------------------
 * CHUNK_SIZE
 * ---------------------------------
 * repeat (N_PROCESSORS + 1) times
 *
 * stage 2:
 * scratch_bottom =
 * entries =
 * ---------------------------------
 *  n_chunks * sizeof(struct entry)
 * ---------------------------------
 * heap =
 * ---------------------------------
 *  n_chunks * sizeof(struct entry *)
 * ---------------------------------
 * write_buffers[x].buffer =
 * ---------------------------------
 *  OUTBUF_SIZE
 * ---------------------------------
 *  OUTBUF_SIZE
 * ---------------------------------
 * buffers =
 * ---------------------------------
 * leftover / n_chunks
 * ---------------------------------
 */


/*
 * Options controlling sort behavior (all sizes are in bytes)
 */

/*
 * Number of processors this sort will be run on
 */
#define N_PROCESSORS 2 

/*
 * Pick a sort - only define one of these
 */
/* My own implementation of Radix Sort */
#define RADSORT 
/* The C library's implementation of quicksort */
/* #define LIB_QSORT */

/*
 * Use the flipping trick to handle negative floats in the radix sort
 */
#define FLOATFLIP 

/*
 * Round sizes of data structures to disk block sizes
 */
#define ROUND_SIZES 

/*
 * Use integer instructions for FP compares
 */
// #define INTEGER_FP

/*
 * How many sorting threads to run (note, we are disk bound, so making this
 * higher than 1 doesn't really help)
 */
#define N_SORT_THREADS N_PROCESSORS 

/*
 * Do we try to optimize disk I/O by doing transfers that are the size of the
 * filesystem blocksize?
 */
#define FS_BLOCKSIZE_IO 

/*
 * Amount of memory we malloc() on the heap for buffering, etc.
 */
#define SCRATCH_SIZE (64 << 20) // 64 MByte 


/*
 * Size and number of buffers to use for write buffering in the second stage
 */
#define N_WRITE_BUFFERS 2 
#define STAGE2_OUTBUF_SIZE (1048576/2) 

/*
 * Do sync loads in stage 2
 */
/* #define STAGE2_LOAD_SYNC */

/*
 * Name of our temporary file
 */
#define TMPFILE "supersort.tmp" 

/*
 * Turn on various informational messages, as well as some timing code
 */
#define DEVELOPING 

/*
 * Turn on gobs of debugging output
 */
/* #define DEBUGGING */

/*
 * End of options
 */

/* 
 * We have one extra chunk buffer so that each CPU can be sorting while the
 * disk is being read.
 */
#define N_CHUNKBUFFERS (N_PROCESSORS + 1) 

#ifndef INTEGER_FP 
#define FLOAT float 
#define FLOAT_LT(a,b) ((a) < (b)) 
#define FLOAT_EQ(a,b) ((a) == (b)) 
#define RADCAST (uint32_t*) 
#else 
/*
 * No floating point - use the formulas for doing FP using integer instructions
 * from Hacker's Delight
 */
#define FLOAT uint32_t 
#define FLOAT_LT(a,b) ((((int32_t)(a) >= 0) & ((int32_t)(a) < (int32_t)(b))) | (((int32_t)(a) < 0) & ((a) > (b)))) 
#define FLOAT_EQ(a,b) ((a) == (b)) 
#define RADCAST 
#endif 

/*
 * Bottom and top of the scratch buffer
 */
void *scratch_bottom;
void *scratch_top;

/*
 * If we're doing a radix sort, we have to cut the chunk size in half, because
 * it's not an inplace sort.
 */
#ifdef RADSORT 
uint32_t CHUNK_SIZE = ((SCRATCH_SIZE / (N_CHUNKBUFFERS)) / 2);
#else 
uint32_t CHUNK_SIZE = (SCRATCH_SIZE / (N_CHUNKBUFFERS));
#endif 

/*
 * Information about the chunk buffers for stage 1, used by the sort threads
 * and the disk thread to communicate.
 */
typedef enum { CHUNK_EMPTY, CHUNK_FULL, CHUNK_SORTED, CHUNK_INUSE } chunk_state_t;

struct chunk_info_t {
    int number;
    int size;
    chunk_state_t state;
    FLOAT *read_buffer;
    FLOAT *write_buffer;
};

int chunks_to_read, chunks_to_write, chunks_to_sort, chunks_inuse;

struct chunk_info_t chunk_info[N_CHUNKBUFFERS];
/* Used to lock the chunk metadata */
pthread_mutex_t chunk_mutex;
/* Signaled when a read into a chunk buffer is done */
pthread_cond_t chunk_read_cond;
/* Signaled when a chunk buffer has been sorted */
pthread_cond_t chunk_sorted_cond;

/*
 * Information about the write buffers for stage 2 - used by the sorting
 * and disk threads to communicate.
 */
typedef enum { BUFFER_EMPTY, BUFFER_FULL, BUFFER_INUSE } buffer_state_t;
struct buffer_info_t {
    FLOAT *buffer;
    int size;
    buffer_state_t state;
    off_t offset;
};

struct buffer_info_t write_buffers[N_WRITE_BUFFERS];
/* Used to lock the buffer metadata */
pthread_mutex_t buffer_mutex;
/* Used to singal that a write buffer flush is being requested */
pthread_cond_t buffer_io_cond;
/* Used to singal that a write buffer flush is finished */
pthread_cond_t buffer_io_done_cond;

/*
 * Argments for the disk stages - necessary, because pthreads only allows you
 * to pass a single argument pointer to forked threads.
 */
struct disk_stage1_args {
    int infd;
    int tmpfd;
};

struct disk_stage2_args {
    int tmpfd;
    int outfd;
};

/*
 * 'bools' used to tell threads when a stage is done
 */
int stage1_done;
int state2_done;

/* Now calculated dynamically below */
uint32_t stack_size = 0;

/*
 * Convenience macro, to get the top element of any element in the heap
 */
#define HEAP(x) (*(heap[(x)]->stack_current)) 

/*
 * Output Macros
 */
#ifdef DEBUGGING 
#define DEBUG(format, ...) fprintf(stderr,format, ## __VA_ARGS__) 
#else 
#define DEBUG(...)  
#endif 

#ifdef DEVELOPING 
#define INFO(format, ...) fprintf(stderr,format, ## __VA_ARGS__) 
#else 
#define INFO(format, ...)  
#endif 

/*
 * Entry on the heap - must be defined here because many things below reference
 * it.
 */
struct heap_entry {
    FLOAT *stack_top;
    FLOAT *stack_current;
    uint32_t chunk_number;
    uint32_t remaining;
    uint32_t end_of_chunk;
    off_t next_start_pos;
};

/*
 * Forward declarations of helper functions below.
 * Whew - that's a lot!
 */
inline void read_data(int, FLOAT*, uint32_t, int, off_t);
inline void write_data(int, FLOAT*, uint32_t, int, off_t);
int round_down(int,int);
void stage1(int,int);
void *disk_stage1(void *);
void stage2(int,int);
void *disk_stage2(void *);
void *sort_stage1(void *);
void load_stack(int, struct heap_entry*);
void load_stacks(int, struct heap_entry *, FLOAT *);
void min_heapify_down(struct heap_entry**, const uint32_t, const uint32_t);
void heap_pop(struct heap_entry**, uint32_t);
void heap_insert(struct heap_entry**, uint32_t, struct heap_entry*);


#ifdef RADSORT 
uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count);
#endif 

off_t file_size;
int main(int argc, char **argv) {

    int infd, outfd, tmpfd;
#ifdef DEVELOPING 
    struct timeval stage1_start,stage1_stop,stage2_start,stage2_stop,
                   stage1_total, stage2_total, total;
    float stage1_rate, stage2_rate, total_rate;
#endif 

    if (argc != 3) {
	fprintf(stderr,"usage: %s <infile> <outfile>\n",argv[0]);
	exit(-1);
    }

    /*
     * Open up our files
     */

    /*
     * TODO: Open any of these with O_DIRECT? (portable?)
     */
    if ((infd = open(argv[1],O_RDONLY)) < 1) {
	fprintf(stderr,"Error opening input file %s\n",argv[1]);
		/* strerror(errno)); Linux does not like this */
	exit(-1);
    }

    if ((outfd =
              open(argv[2],O_WRONLY|O_CREAT|O_TRUNC,S_IWUSR | S_IRUSR)) < 1) {
	fprintf(stderr,"Error opening output file %s: %s\n",argv[2],
		strerror(errno));
	exit(-1);
    }

    if ((tmpfd = open(TMPFILE,O_RDWR|O_CREAT|O_TRUNC, S_IWUSR | S_IRUSR)) < 1) {
	fprintf(stderr,"Error opening temp file %s: %s\n",TMPFILE,
		strerror(errno));
	exit(-1);
    }

    /*
     * Thanks to UNIX semantics, this file will not really go away until we
     * close it. This way, if we crash, it definitely gets removed.
     */
#ifndef DEVELOPING 
    unlink(TMPFILE);
#endif 

    /* 
     * Grab some space on the heap to work in
     * No malloc() allowed after this point!
     * Note - pthreads uses malloc, so this option is deprecated
     */
#ifdef ALLOC_SBRK 
    scratch_bottom = sbrk(0);
    if (sbrk(SCRATCH_SIZE) == (void*)-1) {
	perror("Unable to allocate scratch memory:");
	exit(-1);
    }
    scratch_top = sbrk(0);
#else 
    scratch_bottom = malloc(SCRATCH_SIZE);
    if (!scratch_bottom) {
	perror("Unable to allocate scratch memory:");
	exit(-1);
    }
    scratch_top = scratch_bottom + SCRATCH_SIZE;
#endif 

#ifdef DEVELOPING 
    gettimeofday(&stage1_start,NULL);
#endif 

    /*
     * Run the first stage!
     */
    stage1(infd, tmpfd);

#ifdef DEVELOPING 
    gettimeofday(&stage1_stop,NULL);
#endif 
    INFO("Switching stages\n");
#ifdef DEVELOPING 
    gettimeofday(&stage2_start,NULL);
#endif 

    /*
     * Run the second stage.
     */
    stage2(tmpfd, outfd);

#ifdef DEVELOPING 
    gettimeofday(&stage2_stop,NULL);
#endif 

    close(infd);
    close(tmpfd);
    close(outfd);

    /*
     * Print out a lot of fun timings
     */
#ifdef DEVELOPING 
    timersub(&stage1_stop,&stage1_start,&stage1_total);
    timersub(&stage2_stop,&stage2_start,&stage2_total);
    timersub(&stage2_stop,&stage1_start,&total);
    stage1_rate = (file_size / 1024 / 1024) /
                  (stage1_total.tv_sec + stage1_total.tv_usec/1000000.0);
    stage2_rate =  (file_size / 1024 / 1024) /
        (stage2_total.tv_sec + stage2_total.tv_usec/1000000.0);
    total_rate = (file_size / 1024 / 1024) /
        (total.tv_sec + total.tv_usec/1000000.0);
    INFO("Stage 1: %li.%06li s (%f MB/sec)\n", stage1_total.tv_sec,
            stage1_total.tv_usec, stage1_rate);
    INFO("Stage 2: %li.%06li s (%f MB/sec)\n", stage2_total.tv_sec,
            stage2_total.tv_usec, stage2_rate);
    INFO("Total  : %li.%06li s (%f MB/sec)\n", total.tv_sec,
            total.tv_usec, total_rate);
#endif 

    exit(0);
}

/*
 * Passed as a comparator to the C library's qsort() function
 */
#ifndef RADSORT 
inline int float_comparator(const FLOAT *a, const FLOAT *b) {
    if (FLOAT_LT(*a,*b)) { return -1; }
    if (FLOAT_EQ(*a,*b)) { return 0; }
    return 1;
}
#endif 

/*
 * Number of chunks in the file
 */
uint32_t n_chunks;
/*
 * Size of the last chunk, since it might be smaller than the others.
 */
off_t last_chunk_size;
/*
 * Size we decide on for disk IO
 */
size_t io_blocksize;

/* -------------------------------------------------------------------------
 * Stage 1
 * ------------------------------------------------------------------------- */

/*
 * First stage: Sort chunks of the input file into the output file
 */
void stage1(int infd, int tmpfd) {
    struct stat sb;
    int i;
    pthread_t disk_thread;
    struct disk_stage1_args args;
    pthread_t sort_threads[N_SORT_THREADS];

    /*
     * Find out how many chunks there are in the file and the optimal IO
     * blocksize
     */
    fstat(infd,&sb);
    file_size = sb.st_size;
    io_blocksize = sb.st_blksize;

    INFO("Using I/O blocksize %i\n",io_blocksize);

#ifdef ROUND_SIZES 
    CHUNK_SIZE = round_down(CHUNK_SIZE,io_blocksize);
#endif 

    n_chunks = file_size / CHUNK_SIZE;
    last_chunk_size = file_size % CHUNK_SIZE;
    if (last_chunk_size == 0) {
	last_chunk_size = CHUNK_SIZE;
    } else {
	n_chunks++;
    }

    INFO("Using %i chunks of size %i\n",n_chunks,CHUNK_SIZE);
    INFO("Last chunk size %li\n",(unsigned long int)last_chunk_size);

    /*
     * Set up threading state
     */
    for (i = 0; i < N_CHUNKBUFFERS; i++) {
        chunk_info[i].state = CHUNK_EMPTY;
#ifdef RADSORT 
        chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE * 2;
#else 
        chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE;
        chunk_info[i].write_buffer = scratch_bottom + i * CHUNK_SIZE;
#endif 
    }
    pthread_mutex_init(&chunk_mutex,NULL);
    pthread_cond_init(&chunk_read_cond,NULL);
    pthread_cond_init(&chunk_sorted_cond,NULL);

    chunks_to_read = chunks_to_write = chunks_to_sort = 0;

    /*
     * Launch disk thread
     */
    stage1_done = 0;
    args.infd = infd;
    args.tmpfd = tmpfd;
    if (pthread_create(&disk_thread,NULL,disk_stage1, &args)) {
        perror("Error creating disk thread");
        exit(-1);
    }

    /*
     * Launch sorting threads
     */
    for (i = 0; i < N_SORT_THREADS; i++) {
        if (pthread_create(&(sort_threads[i]),NULL,sort_stage1,NULL)) {
            perror("Error creating disk thread");
            exit(-1);
        }
    }

    /*
     * Wait for the sorting threads to finish
     */
    pthread_join(sort_threads[0],NULL);
}

/*
 * Stage 1 sorting routine - radix sort chunk buffers
 */
void *sort_stage1(void *nuthin) {
    int i;
    INFO("Sorting thread, stage 1 starting\n");

     /*
      * Wait for the disk thread to decide we're done
      */
    while (!stage1_done) {
        DEBUG("Sort thread loop\n");

        /*
         * Wait for work to do, or for the stage to be over
         */
        pthread_mutex_lock(&chunk_mutex);
        while(!chunks_to_sort && !stage1_done) {
            pthread_cond_wait(&chunk_read_cond,&chunk_mutex);
        }

        /*
         * Look for a FULL chunk, and start working on it, making it an INUSE
         * chunk.
         */
        for (i = 0; i < N_CHUNKBUFFERS; i++) {
            if (chunk_info[i].state == CHUNK_FULL) {
                FLOAT* out;
                chunk_info[i].state = CHUNK_INUSE;
                chunks_to_sort--;
                chunks_inuse++;

                pthread_mutex_unlock(&chunk_mutex);

                /* INFO("Sorting chunk %i\n",chunk_info[i].number); */
#ifdef LIB_QSORT 
                DEBUG("Calling qsort\n");
                qsort(chunk_info[i].read_buffer,chunk_info[i].size / 4,
                        4,&float_comparator);
#endif 
#ifdef RADSORT 
                /*
                 * Call the radix sort - since radix is not an inplace sort,
                 * we have to let it tell us where it has put the output
                 */
                out = chunk_info[i].read_buffer + CHUNK_SIZE/4;
                out = (FLOAT*) rad_sort(RADCAST chunk_info[i].read_buffer,
                                       RADCAST out, chunk_info[i].size / 4);
                chunk_info[i].write_buffer = out;
#endif 
                /*
                 * Let the other threads know we've finished sorting this
                 * chunk.
                 */
                pthread_mutex_lock(&chunk_mutex);

                chunk_info[i].state = CHUNK_SORTED;
                chunks_inuse--;
                chunks_to_write++;

                pthread_cond_signal(&chunk_sorted_cond);
            }
        }

        pthread_mutex_unlock(&chunk_mutex);
    }

    INFO("Sorting thread, stage 1 exiting\n");
    return NULL;
}

/*
 * Disk thread for stage 1 - read and write chunk buffers
 */
void *disk_stage1(void *_args) {
    int i;
    uint32_t current_read_chunk;
    struct disk_stage1_args *args = (struct disk_stage1_args*)_args;

    INFO("Disk thread, stage 1 starting\n");

    /*
     * Initialize all of the chunk metadata
     */
    chunks_to_sort = 0;
    chunks_to_write = 0;
    chunks_inuse = 0;
    if (n_chunks > N_CHUNKBUFFERS) {
        chunks_to_read = N_CHUNKBUFFERS;
    } else {
        chunks_to_read = n_chunks;
    }

    /*
     * Loop looking for work to do
     */
    current_read_chunk = 0;
    while (chunks_to_write || chunks_to_read || chunks_to_sort ||
           chunks_inuse) {
        DEBUG("Disk thread loop\n");

        /*
         * Wait for a chunk to be ready to read or write
         */
        pthread_mutex_lock(&chunk_mutex);
        while (!chunks_to_read && !chunks_to_write) {
            pthread_cond_wait(&chunk_sorted_cond,&chunk_mutex);
        }

        /*
         * Go through the chunk buffers, looking for one to do work on
         */
        for (i = 0; i < N_CHUNKBUFFERS; i++) {
            switch (chunk_info[i].state) {
                case CHUNK_EMPTY:
                    if (current_read_chunk < n_chunks) {
                        /* INFO("Reading chunk %i\n",current_read_chunk); */
                                
                        /*
                         * Let other threads know we're reading this one
                         */
                        chunk_info[i].state = CHUNK_INUSE;
                        chunks_inuse++;
                        if (current_read_chunk == (n_chunks - 1)) {
                            chunk_info[i].size = last_chunk_size;
                        } else {
                            chunk_info[i].size = CHUNK_SIZE;
                        }
                        chunk_info[i].number = current_read_chunk++;

                        pthread_mutex_unlock(&chunk_mutex);

                        DEBUG("Set size to %i\n",chunk_info[i].size);

                        /*
                         * Do the actual read, using our helper that does
                         * optimal size block IO
                         */
                        read_data(args->infd,chunk_info[i].read_buffer,
                                chunk_info[i].size,0,0);

                        DEBUG("Chunk read finished\n");

                        /*
                         * Let sorting threads know they can now sort this
                         * chunk
                         */
                        pthread_mutex_lock(&chunk_mutex);

                        chunk_info[i].state = CHUNK_FULL;
                        chunks_inuse--;
                        chunks_to_read--;
                        chunks_to_sort++;

                        pthread_cond_signal(&chunk_read_cond);
                    } else {
                        /*
                         * We've read the last chunk
                         */
                        chunks_to_read = 0;
                    }
                    break;
                case CHUNK_SORTED:
                    /*
                     * Let other threads know we're writing this chunk
                     */
                    chunk_info[i].state = CHUNK_INUSE;
                    chunks_inuse++;

                    pthread_mutex_unlock(&chunk_mutex);

                    INFO("Writing chunk %i (stage1 %.2f%% done)\n",
                            chunk_info[i].number,
                            chunk_info[i].number*100.0 / n_chunks);

                    /*
                     * Do the actual write
                     */
                    write_data(args->tmpfd,chunk_info[i].write_buffer,
                            chunk_info[i].size,1,
                            CHUNK_SIZE * chunk_info[i].number);

                    /*
                     * We'll find out on the next pass through the loop that
                     * this chunk is ready to read
                     */
                    pthread_mutex_lock(&chunk_mutex);
                    chunks_to_write--;
                    chunks_to_read++;
                    chunks_inuse--;
                    chunk_info[i].state = CHUNK_EMPTY;

                    break;
                default:
                    ; /* Nothing to do */
            }
        }

        pthread_mutex_unlock(&chunk_mutex);

    }

    INFO("Disk thread, stage 1 exiting\n");

    /*
     * Flag that the stage is done, and wake up the sorter threads so they'll
     * notice.
     */
    pthread_mutex_lock(&chunk_mutex);
    stage1_done = 1;
    pthread_cond_broadcast(&chunk_read_cond);
    pthread_mutex_unlock(&chunk_mutex);

    return NULL;
}

/* -------------------------------------------------------------------------
 * Stage 2
 * ------------------------------------------------------------------------- */

/*
 * This messy stuff is used to keep track of all of the things we're doing with
 * the write buffers
 */
int buffers_to_write;
int buffers_inuse;
int buffers_empty;
int buffers_pending;
int stacks_to_load;

/*
 * Used to tell the disk thread we're done with this stage
 */
int stage2_done;

/*
 * Place where the heap entries reside
 */
struct heap_entry *entries;

/*
 * Keep track of how much we've written for informational purposes
 */
unsigned long total_written = 0;

/*
 * Stage 2 - Do a parallel merge of all chunks
 */
void stage2(int tmpfd, int outfd) {
    struct heap_entry **heap;
    uint32_t heap_size;
    int i;
    FLOAT *stacks;
    int current_write_buffer;
    FLOAT *outbuf;
    pthread_t disk_thread;
    struct disk_stage2_args disk_args;
    size_t disk_offset;

    /*
     * Do the memory layout for stage 1
     */
    /* Stack entries go at the bottom */
    entries = (struct heap_entry*)scratch_bottom;
    /* Leave space for n_chunks heap entries, then the heap starts */
    heap = (struct heap_entry**)((struct heap_entry*)entries + n_chunks);
    /* Leave space for a heap of size heap_size, then start the write buffers */
    write_buffers[0].buffer = (FLOAT*)((struct heap_entry**)heap + n_chunks);
    /* Pack in all of the write buffers */
    for (i = 1; i < N_WRITE_BUFFERS; i++) {
        write_buffers[i].buffer =
            (FLOAT*)((void*)write_buffers[i-1].buffer + STAGE2_OUTBUF_SIZE);
    }
    /* Stacks start after the end of the write buffers */
    stacks = (FLOAT*)((void*)write_buffers[N_WRITE_BUFFERS - 1].buffer
            + STAGE2_OUTBUF_SIZE);

    /*
     * Figure out what size stacks we can fit in the space allocated to them
     */
    stack_size = (scratch_top - (void*)stacks) / n_chunks;

#ifdef ROUND_SIZES 
    stack_size = round_down(stack_size, io_blocksize);
#endif 

    INFO("%i bytes left in memory, using stacks of size %i\n",
            (scratch_top - (void*)stacks), stack_size);

    INFO("entries: %p, heap: %p, stacks: %p\n",
	    entries,heap,stacks);

    /*
     * Read in the intial contents of all of the stacks
     */
    heap_size = 0;
    load_stacks(tmpfd,entries,stacks);
    for (i = 0; i < n_chunks; i++) {
	heap_insert(heap,heap_size,entries + i);
	heap_size++;
    }

    /*
     * Set up thread state
     */
    pthread_cond_init(&buffer_io_cond,NULL);
    pthread_cond_init(&buffer_io_done_cond,NULL);
    pthread_mutex_init(&buffer_mutex,NULL);

    stacks_to_load = 0;
    stage2_done = 0;

    /*
     * Write buffers start empty
     */
    for (i = 0; i < N_WRITE_BUFFERS; i++) {
        write_buffers[i].size = 0;
        write_buffers[i].state = BUFFER_EMPTY;
    }
    buffers_empty = N_WRITE_BUFFERS;

    /*
     * Grab the first write buffer to write sorted output into
     */
    current_write_buffer = 0;
    outbuf = write_buffers[current_write_buffer].buffer;
    write_buffers[current_write_buffer].state = BUFFER_INUSE;
    write_buffers[current_write_buffer].offset = 0;
    buffers_inuse = 1;
    buffers_empty--;

    /*
     * Start writing at the beginning of the output file
     */
    disk_offset = 0;

    /*
     * Fork off a disk thread
     */
    disk_args.tmpfd = tmpfd;
    disk_args.outfd = outfd;
    if (pthread_create(&disk_thread,NULL,disk_stage2,&disk_args)) {
            perror("Creating disk thread");
            exit(-1);
    }

    /*
     * Keep going until we've emptied the heap
     */
    while (heap_size > 0) {
	DEBUG("Heap size: %i\n",heap_size);

        /*
         * If we've filled up an output buffer, tell the disk thread to
         * flush it, and grab another write buffer
         */
	if (write_buffers[current_write_buffer].size == STAGE2_OUTBUF_SIZE) {
            DEBUG("Flushing %i to disk - %i\n",current_write_buffer,
                    write_buffers[current_write_buffer].size);

            /*
             * Tell the disk thread we're ready to flush
             */
            pthread_mutex_lock(&buffer_mutex);

            write_buffers[current_write_buffer].state = BUFFER_FULL;
            buffers_inuse--;
            buffers_to_write++;

            pthread_cond_signal(&buffer_io_cond);
            disk_offset += write_buffers[current_write_buffer].size;
            
            /* Unlock so that the disk thread gets to run */
            pthread_mutex_unlock(&buffer_mutex);

            /*
             * Find a new write buffer to use
             */
            pthread_mutex_lock(&buffer_mutex);
            /* Maybe we've gotten ahead of the disk thread */
            while (!buffers_empty) {
                /* INFO("Waiting for a free buffer\n"); */
                pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex);
            }

            /*
             * Find a new write buffer to use
             */
            for (i = 0; i < N_WRITE_BUFFERS; i++) {
                if (write_buffers[i].state == BUFFER_EMPTY) {
                    write_buffers[i].state = BUFFER_INUSE;
                    write_buffers[i].size = 0;
                    write_buffers[i].offset = disk_offset;
                    buffers_empty--;
                    buffers_inuse++;
                    current_write_buffer = i;
                    break;
                }
            }
            DEBUG("Picked buffer %i\n",current_write_buffer);
            pthread_mutex_unlock(&buffer_mutex);

	    outbuf = write_buffers[current_write_buffer].buffer;
	}

	DEBUG("heap: %p, cs: %p\n",heap,outbuf);

        /*
         * Put the top of the heap into the output buffer
         */
	*outbuf = HEAP(0);
        (write_buffers[current_write_buffer].size) += 4;
	outbuf++;

        /*
         * Pop the top stack in the heap
         */
	(heap[0]->stack_current)++;
	heap[0]->remaining--;

	DEBUG("remaining: %i\n",heap[0]->remaining);

        /*
         * This stack is empty - decide what to do with it
         */
	if (heap[0]->remaining == 0) {
            /*
             * If this stack was the last in its chunk, just pop it off the
             * stack
             */
	    if (heap[0]->end_of_chunk) {
		DEBUG("popping\n");
		heap_pop(heap,heap_size);
		heap_size--;
	    } else {
                /*
                 * Otherwise, we have to reload the stack
                 */
		DEBUG("load\n");

#ifdef STAGE2_LOAD_SYNC 
                load_stack(tmpfd,heap[0]);
#else 
                /*
                 * Tell the disk tread we've got a stack for it to read
                 */
                pthread_mutex_lock(&buffer_mutex);
                stacks_to_load++;
                pthread_cond_signal(&buffer_io_cond);

                /*
                 * Wait for it to finish
                 */
                while (!heap[0]->remaining) {
                    pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex);
                }

                pthread_mutex_unlock(&buffer_mutex);
#endif 
                /*
                 * Re-heapify, since the stack has been reloaded
                 */
		min_heapify_down(heap,heap_size,0);
	    }
	} else { /* if (heap[0]->remaining == 0) */
            /*
             * Re-heapify in case the pop made this stack no longer the
             * smallest.
             */
	    min_heapify_down(heap,heap_size,0);
	}
    } /* while (heap_size > 0) */

    /*
     * Okay, we're done merging, but we have to tell the disk thread to
     * flush the last write buffer we used
     */
    INFO("Flushing to disk\n");

    pthread_mutex_lock(&buffer_mutex);

    write_buffers[current_write_buffer].state = BUFFER_FULL;
    buffers_inuse--;
    buffers_to_write++;

    pthread_cond_signal(&buffer_io_cond);
    stage2_done = 1;

    /*
     * Unlock so that the disk thread gets to run
     */
    pthread_mutex_unlock(&buffer_mutex);

    /*
     * Wait for the disk thread to finish
     */
    pthread_join(disk_thread,NULL);
}

/*
 * Disk thread for stage 2 - write out write buffers and read in stacks
 */
void *disk_stage2(void *_args) {
    struct disk_stage2_args *args = (struct disk_stage2_args*)_args;
    int i;

    INFO("Disk thread, stage 2 starting\n");

    /*
     * As long as there's some work to do, stay in this loop
     */
    while (!stage2_done || buffers_pending || buffers_to_write) {
        DEBUG("Disk thread here: %i %i %i %i\n",stage2_done,buffers_pending,
                buffers_to_write,stacks_to_load);
        pthread_mutex_lock(&buffer_mutex);

        /*
         * Wait for actual work to do
         */
        while (!buffers_to_write && !stacks_to_load) {
            DEBUG("Disk thread waiting\n");
            pthread_cond_wait(&buffer_io_cond,&buffer_mutex);
        }

        /*
         * Look for buffers to write
         */
        for (i = 0; i < N_WRITE_BUFFERS; i++) {
            if (write_buffers[i].state == BUFFER_FULL) {
                /*
                 * Mark it as ours
                 */
                write_buffers[i].state = BUFFER_INUSE;
                buffers_to_write--;
                buffers_pending++;

                pthread_mutex_unlock(&buffer_mutex);

                DEBUG("Writing data in buffer %i (size %i)\n",i,
                        write_buffers[i].size);
                /*
                 * Do the actual write
                 */
                write_data(args->outfd,write_buffers[i].buffer,
                        write_buffers[i].size,1,write_buffers[i].offset);
                total_written += write_buffers[i].size;

#ifdef DEVELOPING 
                /*
                 * Every once in a while, let the user know how we're doing
                 */
                if (total_written % (STAGE2_OUTBUF_SIZE * 10) == 0) {
                    INFO("Sorted bytes written: %lu (stage2 %.2f%% done)\n",
                            total_written, total_written * 100.0 / file_size);
                }
#endif 
                DEBUG("Write done\n");
                /*
                 * Toss this buffer back to the sorting thread
                 */
                pthread_mutex_lock(&buffer_mutex);

                buffers_pending--;
                buffers_empty++;
                write_buffers[i].state = BUFFER_EMPTY;

                pthread_cond_signal(&buffer_io_done_cond);
            }
        }

        pthread_mutex_unlock(&buffer_mutex);

#ifndef STAGE2_LOAD_SYNC 
        /*
         * Looks for stacks that need reloading - this is a slow way to do
         * this, but I don't want to worry about a more complicated data
         * structure right now.
         */
        for (i = 0; i < n_chunks; i++) {
            /* if (entries[i].remaining == 0) { */
            if (entries[i].remaining <= 0) {
                /*
                 * Load the stack
                 */
                load_stack(args->tmpfd,&(entries[i]));

                /*
                 * Tell the sorting thread we're done
                 */
                pthread_mutex_lock(&buffer_mutex);
                stacks_to_load--;
                pthread_mutex_unlock(&buffer_mutex);
                pthread_cond_signal(&buffer_io_done_cond);
            }
        }
#endif 
    }

    INFO("Disk thread, stage 2 exiting\n");
    return NULL;

}

/*
 * Initialize the metadata for each stack, and load it from disk
 */
void load_stacks(int tmpfd, struct heap_entry *entries, FLOAT *stacks) {
    int i;
    FLOAT *stack_start;
    for (i = 0; i < n_chunks; i++) {
	entries[i].chunk_number = i;
	stack_start = (FLOAT*)((void*)stacks + stack_size * i);
	entries[i].stack_top = entries[i].stack_current = stack_start;
	entries[i].next_start_pos = 0;
	entries[i].end_of_chunk = 0;
	load_stack(tmpfd, entries + i);
    }
}

/*
 * Load a single stack
 */
void load_stack(int tmpfd, struct heap_entry *entry) {
    size_t size;
    size_t chunk_size;
    size_t remaining;
    DEBUG("Loading stack from chunk %i\n",entry->chunk_number);

    /*
     * Figure out how much of the chunk there is left to read
     */
    if (entry->chunk_number == (n_chunks - 1)) {
	chunk_size = last_chunk_size;
    } else { 
	chunk_size = CHUNK_SIZE;
    }

    size = stack_size;
    remaining = chunk_size - (entry->next_start_pos * 4);

    DEBUG("fbr: %i, cs: %i, nsp: %li\n",remaining, chunk_size,
            (unsigned long int)entry->next_start_pos);
    /*
     * If we're reading the end of the chunk, mark this in the header
     */
    if (remaining <= stack_size) {
	size = remaining;
	entry->end_of_chunk = 1;
	DEBUG("End of chunk\n");
    }

    /*
     * Do the read
     */
    read_data(tmpfd,entry->stack_top,size,1,(CHUNK_SIZE * entry->chunk_number) +
	                (entry->next_start_pos * 4));

    /* Convert from bytes to floats */
    entry->remaining = (size >> 2);
    DEBUG("set remaining to %i\n",entry->remaining);

    /*
     * Remember the next place to start reading from
     */
    entry->next_start_pos = entry->next_start_pos + (size >> 2);

    /*
     * Start the stack at the top
     */
    entry->stack_current = entry->stack_top;
}

/* -------------------------------------------------------------------------
 * Heap implementation
 * ------------------------------------------------------------------------- */

/*
 * Convenience macros
 */
#define PARENT(x) ((x-1) >> 1) 
#define LCHILD(x) (((x) << 1) + 1) 
#define RCHILD(x) (((x) << 1) + 2) 

/*
 * From a given starting position, bubble the heap entry up to its propper
 * location
 */
void min_heapify_up(struct heap_entry **heap, const uint32_t start) {
    int i = start;
    struct heap_entry *parent;
    struct heap_entry *this;
    this = heap[i];
    parent = heap[PARENT(i)];
    /*
     * Loop until we're the top element, or the parent element is
     * small (as its suppsed to be)
     */
    while ((i > 0) &&
           FLOAT_LT(*(this->stack_current),*(parent->stack_current))) {
        /*
         * Exchange with parent
         */
	heap[PARENT(i)] = this;
	heap[i] = parent;
	i = PARENT(i);
	this = heap[i];
	if (i > 0) {
	    parent = heap[PARENT(i)];
	}
    }
}

/*
 * From a given starting position, sink the heap entry down to its propper
 * location
 */
void min_heapify_down(struct heap_entry **heap, const uint32_t size,
        const uint32_t start) {
    int i = start;
    register uint32_t lchild, rchild, smallest;
    register struct heap_entry *tmp;

    /*
     * More complicated exit condition
     */
    while (1) {
	lchild = LCHILD(i);
	rchild = RCHILD(i);

        /*
         * Figure out which of the three (us, and our children) is the smallest
         */
        smallest = i;
        if ((lchild < size) && (HEAP(lchild) < HEAP(i))) {
            smallest = lchild;
        }
        if ((rchild < size) && (HEAP(rchild) < HEAP(smallest))) {
            smallest = rchild;
        }
        
        /*
         * If it's not us, swap with the smallest - otherwise, we're done
         */
        if (smallest != i) {
            tmp = heap[i];
            heap[i] = heap[smallest];
            heap[smallest] = tmp;
            i = smallest;
        } else {
            break;
        }
    }
}

/*
 * Pop an element off the heap - caller must decrement heap size
 */
void heap_pop(struct heap_entry **heap, uint32_t size) {
    DEBUG("Moving heap element at %i\n",size -1);
    DEBUG("Remaining is %i\n",heap[size -1]->remaining);
    heap[0] = heap[size -1];
    min_heapify_down(heap,size -1,0);
}

/*
 * Insert an element into the heap - caller must increment heap size
 */
void heap_insert(struct heap_entry **heap, uint32_t size,
        struct heap_entry *item) {
    heap[size] = item;
    min_heapify_up(heap,size);
}

/* -------------------------------------------------------------------------
 * Radix sort implementation
 * ------------------------------------------------------------------------- */
#define RADIX_SIZE 8 
#define RADIX_BUCKETS (256) 
#define RADIX_PASSES 4 

/*
 * For larger radix - broken
 */
/*
#define RADIX_SIZE 11
#define RADIX_BUCKETS (256 << 3)
#define RADIX_PASSES 3
*/
uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count) {
    uint32_t buckets[RADIX_BUCKETS];
    uint32_t offsets[RADIX_BUCKETS];
    uint32_t bitmask;
    int i;
    int pass;

    DEBUG("rad_sort in = %p, out = %p\n",in, out);

    /*
     * Flip with Michael Herf's algorithm
     */
#ifdef FLOATFLIP 
    for (i = 0; i <= count; i++) {
	in[i] = in[i] ^ (-(int32_t)(in[i] >> 31) | 0x80000000);
    }
#endif 

    /*
     * Go through the passes
     */
    for (pass = 0; pass < RADIX_PASSES; pass++) {
	bitmask = 0;
	for (i = 0; i < RADIX_SIZE; i++) {
	    bitmask = (bitmask << 1) | 0x01;
	}
	bitmask = bitmask << (pass * RADIX_SIZE);
	DEBUG("Bitmask for pass %i is 0x%08x\n", pass, bitmask);

        /*
         * Clear out the buckets
         */
	bzero(buckets,RADIX_BUCKETS * sizeof(uint32_t));
	bzero(offsets,RADIX_BUCKETS * sizeof(uint32_t));

	/*
	 * First stage, fill the buckets
	 */
	for (i = 0; i < count; i++) {
	    uint32_t tmp = in[i];
	    unsigned char rad = (tmp & bitmask) >> (pass * RADIX_SIZE);
	    buckets[rad]++;
	}

	/*
	 * Second stage, make the offset table
	 */
	for (i = 1; i < RADIX_BUCKETS; i++) {
	    offsets[i] = offsets[i - 1] + buckets[i - 1];
	}

	/* 
	 * Finally, copy into the output buffer
	 */
	for (i = 0; i < count; i++) {
	    unsigned char rad;
	    rad = ((uint32_t)in[i] & bitmask) >> (pass * RADIX_SIZE);
	    out[offsets[rad]++] = in[i];
	}

        /*
         * Flip the input and output buffers for the next pass
         */
	uint32_t* tmp = out;
	out = in;
	in = tmp;
    }

    /*
     * Flip back
     */
#ifdef FLOATFLIP 
    for (i = 0; i <= count; i++) {
	in[i] = in[i] ^ (((in[i] >> 31) - 1) | 0x80000000);
    }
#endif 
    
    return in; 
}

/* -------------------------------------------------------------------------
 * Disk IO helper functions 
 * ------------------------------------------------------------------------- */

/*
 * Read data into a buffer, using the optimal blocksize
 */
inline void read_data(int infd, FLOAT *buffer, uint32_t length, int seek,
        off_t file_offset) {
    int i;

    /*
     * Seek if they asked us to
     */
    if (seek != 0) {
        if (lseek(infd,file_offset,SEEK_SET) == -1) {
            perror("Error seeking in input file");
            exit(-1);
        }
    }

#ifdef FS_BLOCKSIZE_IO 
    /*
     * Load in all blocks we can load at full blocksize
     */
    for (i = 0; i < (length / io_blocksize); i++) {
        read(infd,buffer,io_blocksize);
        buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize);
    }

    /*
     * Load the last block
     */
    read(infd, buffer, length % io_blocksize);
#else 
    /*
     * Just slurp the whole thing in one read()
     */
    read(infd,buffer,length);
#endif 
}

/*
 * Read data into a buffer, using the optimal blocksize
 */
inline void write_data(int outfd, FLOAT *buffer, uint32_t length,
        int seek, off_t file_offset) {
    int i;
    /*
     * Seek if they asked us to
     */
    if (seek != 0) {
        DEBUG("Seeking to %li\n", (unsigned long int)file_offset);
        if (lseek(outfd,file_offset,SEEK_SET) == -1) {
            perror("Error seeking in output file");
            exit(-1);
        }
    }
#ifdef FS_BLOCKSIZE_IO 
    /*
     * Write all blocks we can load at full blocksize
     */
    for (i = 0; i < (length / io_blocksize); i++) {
        write(outfd,buffer,io_blocksize);
        buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize);
    }
    /*
     * Write the last block
     */
    write(outfd, buffer, length % io_blocksize);
#else 
    /*
     * Just write the whole thing in one write()
     */
    write(outfd,buffer,length);
#endif 
}  

/* -------------------------------------------------------------------------
 * Other helpers
 * ------------------------------------------------------------------------- */

/*
 * Round a to the nearest b - round down, but never to 0
 */
int round_down(int a, int b) {
    int rounded;
    rounded = (a / b) * b;
    if (rounded > 0) {
        return rounded;
    } else {
        return a;
    }
}