/*
* supersort.c
*
* Copyright 2005 Robert Ricci
* -----------------------------------------------------------------------------
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* If you don't know how to find a copy of the GNU General Public License,
* well, welcome to the 21st century! Or, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
* -----------------------------------------------------------------------------
*
* Uses a two-pass algorithm: The first pass does a Radix sort on chunks (tens
* of MB) of floats from the input file, and writes the sorted chunks to the
* temporary file. The second stage does a parallel merge on all of the chunks
* using a heap. Obviously, we can only hold so much data to sort at a time, so
* we pull from the head of all chunks in parallel, going back to disk to get
* more when our data from each chunk runs dry.
*
* Known to compile on FreeBSD 5.x, 6.x, and Suse Linux 9 (should work on any
* Linux with gcc 3.X)
*
* Building:
* On FreeBSD: gcc -o supersort -pthread -O3 supersort.c
* On Linux: gcc -o supersort -lpthread -O3 supersort.c
*
* See options below
*/
static const char rcsid[] = "$Id: supersort.c,v 1.16 2005/11/28 07:38:38 ricci Exp $";
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>
#ifdef __linux__
#include <stdint.h>
#endif
/*
* Memory layouts:
* (NOTE: drawn upside down)
*
* ##### Stage 1:
* CHUNK_SIZE = SCRATCH_SIZE / (N_PROCESSORS + 1)
* scratch_bottom =
* ---------------------------------
* CHUNK_SIZE
* ---------------------------------
* repeat (N_PROCESSORS + 1) times
*
* stage 2:
* scratch_bottom =
* entries =
* ---------------------------------
* n_chunks * sizeof(struct entry)
* ---------------------------------
* heap =
* ---------------------------------
* n_chunks * sizeof(struct entry *)
* ---------------------------------
* write_buffers[x].buffer =
* ---------------------------------
* OUTBUF_SIZE
* ---------------------------------
* OUTBUF_SIZE
* ---------------------------------
* buffers =
* ---------------------------------
* leftover / n_chunks
* ---------------------------------
*/
/*
* Options controlling sort behavior (all sizes are in bytes)
*/
/*
* Number of processors this sort will be run on
*/
#define N_PROCESSORS 2
/*
* Pick a sort - only define one of these
*/
/* My own implementation of Radix Sort */
#define RADSORT
/* The C library's implementation of quicksort */
/* #define LIB_QSORT */
/*
* Use the flipping trick to handle negative floats in the radix sort
*/
#define FLOATFLIP
/*
* Round sizes of data structures to disk block sizes
*/
#define ROUND_SIZES
/*
* Use integer instructions for FP compares
*/
// #define INTEGER_FP
/*
* How many sorting threads to run (note, we are disk bound, so making this
* higher than 1 doesn't really help)
*/
#define N_SORT_THREADS N_PROCESSORS
/*
* Do we try to optimize disk I/O by doing transfers that are the size of the
* filesystem blocksize?
*/
#define FS_BLOCKSIZE_IO
/*
* Amount of memory we malloc() on the heap for buffering, etc.
*/
#define SCRATCH_SIZE (64 << 20) // 64 MByte
/*
* Size and number of buffers to use for write buffering in the second stage
*/
#define N_WRITE_BUFFERS 2
#define STAGE2_OUTBUF_SIZE (1048576/2)
/*
* Do sync loads in stage 2
*/
/* #define STAGE2_LOAD_SYNC */
/*
* Name of our temporary file
*/
#define TMPFILE "supersort.tmp"
/*
* Turn on various informational messages, as well as some timing code
*/
#define DEVELOPING
/*
* Turn on gobs of debugging output
*/
/* #define DEBUGGING */
/*
* End of options
*/
/*
* We have one extra chunk buffer so that each CPU can be sorting while the
* disk is being read.
*/
#define N_CHUNKBUFFERS (N_PROCESSORS + 1)
#ifndef INTEGER_FP
#define FLOAT float
#define FLOAT_LT(a,b) ((a) < (b))
#define FLOAT_EQ(a,b) ((a) == (b))
#define RADCAST (uint32_t*)
#else
/*
* No floating point - use the formulas for doing FP using integer instructions
* from Hacker's Delight
*/
#define FLOAT uint32_t
#define FLOAT_LT(a,b) ((((int32_t)(a) >= 0) & ((int32_t)(a) < (int32_t)(b))) | (((int32_t)(a) < 0) & ((a) > (b))))
#define FLOAT_EQ(a,b) ((a) == (b))
#define RADCAST
#endif
/*
* Bottom and top of the scratch buffer
*/
void *scratch_bottom;
void *scratch_top;
/*
* If we're doing a radix sort, we have to cut the chunk size in half, because
* it's not an inplace sort.
*/
#ifdef RADSORT
uint32_t CHUNK_SIZE = ((SCRATCH_SIZE / (N_CHUNKBUFFERS)) / 2);
#else
uint32_t CHUNK_SIZE = (SCRATCH_SIZE / (N_CHUNKBUFFERS));
#endif
/*
* Information about the chunk buffers for stage 1, used by the sort threads
* and the disk thread to communicate.
*/
typedef enum { CHUNK_EMPTY, CHUNK_FULL, CHUNK_SORTED, CHUNK_INUSE } chunk_state_t;
struct chunk_info_t {
int number;
int size;
chunk_state_t state;
FLOAT *read_buffer;
FLOAT *write_buffer;
};
int chunks_to_read, chunks_to_write, chunks_to_sort, chunks_inuse;
struct chunk_info_t chunk_info[N_CHUNKBUFFERS];
/* Used to lock the chunk metadata */
pthread_mutex_t chunk_mutex;
/* Signaled when a read into a chunk buffer is done */
pthread_cond_t chunk_read_cond;
/* Signaled when a chunk buffer has been sorted */
pthread_cond_t chunk_sorted_cond;
/*
* Information about the write buffers for stage 2 - used by the sorting
* and disk threads to communicate.
*/
typedef enum { BUFFER_EMPTY, BUFFER_FULL, BUFFER_INUSE } buffer_state_t;
struct buffer_info_t {
FLOAT *buffer;
int size;
buffer_state_t state;
off_t offset;
};
struct buffer_info_t write_buffers[N_WRITE_BUFFERS];
/* Used to lock the buffer metadata */
pthread_mutex_t buffer_mutex;
/* Used to singal that a write buffer flush is being requested */
pthread_cond_t buffer_io_cond;
/* Used to singal that a write buffer flush is finished */
pthread_cond_t buffer_io_done_cond;
/*
* Argments for the disk stages - necessary, because pthreads only allows you
* to pass a single argument pointer to forked threads.
*/
struct disk_stage1_args {
int infd;
int tmpfd;
};
struct disk_stage2_args {
int tmpfd;
int outfd;
};
/*
* 'bools' used to tell threads when a stage is done
*/
int stage1_done;
int state2_done;
/* Now calculated dynamically below */
uint32_t stack_size = 0;
/*
* Convenience macro, to get the top element of any element in the heap
*/
#define HEAP(x) (*(heap[(x)]->stack_current))
/*
* Output Macros
*/
#ifdef DEBUGGING
#define DEBUG(format, ...) fprintf(stderr,format, ## __VA_ARGS__)
#else
#define DEBUG(...)
#endif
#ifdef DEVELOPING
#define INFO(format, ...) fprintf(stderr,format, ## __VA_ARGS__)
#else
#define INFO(format, ...)
#endif
/*
* Entry on the heap - must be defined here because many things below reference
* it.
*/
struct heap_entry {
FLOAT *stack_top;
FLOAT *stack_current;
uint32_t chunk_number;
uint32_t remaining;
uint32_t end_of_chunk;
off_t next_start_pos;
};
/*
* Forward declarations of helper functions below.
* Whew - that's a lot!
*/
inline void read_data(int, FLOAT*, uint32_t, int, off_t);
inline void write_data(int, FLOAT*, uint32_t, int, off_t);
int round_down(int,int);
void stage1(int,int);
void *disk_stage1(void *);
void stage2(int,int);
void *disk_stage2(void *);
void *sort_stage1(void *);
void load_stack(int, struct heap_entry*);
void load_stacks(int, struct heap_entry *, FLOAT *);
void min_heapify_down(struct heap_entry**, const uint32_t, const uint32_t);
void heap_pop(struct heap_entry**, uint32_t);
void heap_insert(struct heap_entry**, uint32_t, struct heap_entry*);
#ifdef RADSORT
uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count);
#endif
off_t file_size;
int main(int argc, char **argv) {
int infd, outfd, tmpfd;
#ifdef DEVELOPING
struct timeval stage1_start,stage1_stop,stage2_start,stage2_stop,
stage1_total, stage2_total, total;
float stage1_rate, stage2_rate, total_rate;
#endif
if (argc != 3) {
fprintf(stderr,"usage: %s <infile> <outfile>\n",argv[0]);
exit(-1);
}
/*
* Open up our files
*/
/*
* TODO: Open any of these with O_DIRECT? (portable?)
*/
if ((infd = open(argv[1],O_RDONLY)) < 1) {
fprintf(stderr,"Error opening input file %s\n",argv[1]);
/* strerror(errno)); Linux does not like this */
exit(-1);
}
if ((outfd =
open(argv[2],O_WRONLY|O_CREAT|O_TRUNC,S_IWUSR | S_IRUSR)) < 1) {
fprintf(stderr,"Error opening output file %s: %s\n",argv[2],
strerror(errno));
exit(-1);
}
if ((tmpfd = open(TMPFILE,O_RDWR|O_CREAT|O_TRUNC, S_IWUSR | S_IRUSR)) < 1) {
fprintf(stderr,"Error opening temp file %s: %s\n",TMPFILE,
strerror(errno));
exit(-1);
}
/*
* Thanks to UNIX semantics, this file will not really go away until we
* close it. This way, if we crash, it definitely gets removed.
*/
#ifndef DEVELOPING
unlink(TMPFILE);
#endif
/*
* Grab some space on the heap to work in
* No malloc() allowed after this point!
* Note - pthreads uses malloc, so this option is deprecated
*/
#ifdef ALLOC_SBRK
scratch_bottom = sbrk(0);
if (sbrk(SCRATCH_SIZE) == (void*)-1) {
perror("Unable to allocate scratch memory:");
exit(-1);
}
scratch_top = sbrk(0);
#else
scratch_bottom = malloc(SCRATCH_SIZE);
if (!scratch_bottom) {
perror("Unable to allocate scratch memory:");
exit(-1);
}
scratch_top = scratch_bottom + SCRATCH_SIZE;
#endif
#ifdef DEVELOPING
gettimeofday(&stage1_start,NULL);
#endif
/*
* Run the first stage!
*/
stage1(infd, tmpfd);
#ifdef DEVELOPING
gettimeofday(&stage1_stop,NULL);
#endif
INFO("Switching stages\n");
#ifdef DEVELOPING
gettimeofday(&stage2_start,NULL);
#endif
/*
* Run the second stage.
*/
stage2(tmpfd, outfd);
#ifdef DEVELOPING
gettimeofday(&stage2_stop,NULL);
#endif
close(infd);
close(tmpfd);
close(outfd);
/*
* Print out a lot of fun timings
*/
#ifdef DEVELOPING
timersub(&stage1_stop,&stage1_start,&stage1_total);
timersub(&stage2_stop,&stage2_start,&stage2_total);
timersub(&stage2_stop,&stage1_start,&total);
stage1_rate = (file_size / 1024 / 1024) /
(stage1_total.tv_sec + stage1_total.tv_usec/1000000.0);
stage2_rate = (file_size / 1024 / 1024) /
(stage2_total.tv_sec + stage2_total.tv_usec/1000000.0);
total_rate = (file_size / 1024 / 1024) /
(total.tv_sec + total.tv_usec/1000000.0);
INFO("Stage 1: %li.%06li s (%f MB/sec)\n", stage1_total.tv_sec,
stage1_total.tv_usec, stage1_rate);
INFO("Stage 2: %li.%06li s (%f MB/sec)\n", stage2_total.tv_sec,
stage2_total.tv_usec, stage2_rate);
INFO("Total : %li.%06li s (%f MB/sec)\n", total.tv_sec,
total.tv_usec, total_rate);
#endif
exit(0);
}
/*
* Passed as a comparator to the C library's qsort() function
*/
#ifndef RADSORT
inline int float_comparator(const FLOAT *a, const FLOAT *b) {
if (FLOAT_LT(*a,*b)) { return -1; }
if (FLOAT_EQ(*a,*b)) { return 0; }
return 1;
}
#endif
/*
* Number of chunks in the file
*/
uint32_t n_chunks;
/*
* Size of the last chunk, since it might be smaller than the others.
*/
off_t last_chunk_size;
/*
* Size we decide on for disk IO
*/
size_t io_blocksize;
/* -------------------------------------------------------------------------
* Stage 1
* ------------------------------------------------------------------------- */
/*
* First stage: Sort chunks of the input file into the output file
*/
void stage1(int infd, int tmpfd) {
struct stat sb;
int i;
pthread_t disk_thread;
struct disk_stage1_args args;
pthread_t sort_threads[N_SORT_THREADS];
/*
* Find out how many chunks there are in the file and the optimal IO
* blocksize
*/
fstat(infd,&sb);
file_size = sb.st_size;
io_blocksize = sb.st_blksize;
INFO("Using I/O blocksize %i\n",io_blocksize);
#ifdef ROUND_SIZES
CHUNK_SIZE = round_down(CHUNK_SIZE,io_blocksize);
#endif
n_chunks = file_size / CHUNK_SIZE;
last_chunk_size = file_size % CHUNK_SIZE;
if (last_chunk_size == 0) {
last_chunk_size = CHUNK_SIZE;
} else {
n_chunks++;
}
INFO("Using %i chunks of size %i\n",n_chunks,CHUNK_SIZE);
INFO("Last chunk size %li\n",(unsigned long int)last_chunk_size);
/*
* Set up threading state
*/
for (i = 0; i < N_CHUNKBUFFERS; i++) {
chunk_info[i].state = CHUNK_EMPTY;
#ifdef RADSORT
chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE * 2;
#else
chunk_info[i].read_buffer = scratch_bottom + i * CHUNK_SIZE;
chunk_info[i].write_buffer = scratch_bottom + i * CHUNK_SIZE;
#endif
}
pthread_mutex_init(&chunk_mutex,NULL);
pthread_cond_init(&chunk_read_cond,NULL);
pthread_cond_init(&chunk_sorted_cond,NULL);
chunks_to_read = chunks_to_write = chunks_to_sort = 0;
/*
* Launch disk thread
*/
stage1_done = 0;
args.infd = infd;
args.tmpfd = tmpfd;
if (pthread_create(&disk_thread,NULL,disk_stage1, &args)) {
perror("Error creating disk thread");
exit(-1);
}
/*
* Launch sorting threads
*/
for (i = 0; i < N_SORT_THREADS; i++) {
if (pthread_create(&(sort_threads[i]),NULL,sort_stage1,NULL)) {
perror("Error creating disk thread");
exit(-1);
}
}
/*
* Wait for the sorting threads to finish
*/
pthread_join(sort_threads[0],NULL);
}
/*
* Stage 1 sorting routine - radix sort chunk buffers
*/
void *sort_stage1(void *nuthin) {
int i;
INFO("Sorting thread, stage 1 starting\n");
/*
* Wait for the disk thread to decide we're done
*/
while (!stage1_done) {
DEBUG("Sort thread loop\n");
/*
* Wait for work to do, or for the stage to be over
*/
pthread_mutex_lock(&chunk_mutex);
while(!chunks_to_sort && !stage1_done) {
pthread_cond_wait(&chunk_read_cond,&chunk_mutex);
}
/*
* Look for a FULL chunk, and start working on it, making it an INUSE
* chunk.
*/
for (i = 0; i < N_CHUNKBUFFERS; i++) {
if (chunk_info[i].state == CHUNK_FULL) {
FLOAT* out;
chunk_info[i].state = CHUNK_INUSE;
chunks_to_sort--;
chunks_inuse++;
pthread_mutex_unlock(&chunk_mutex);
/* INFO("Sorting chunk %i\n",chunk_info[i].number); */
#ifdef LIB_QSORT
DEBUG("Calling qsort\n");
qsort(chunk_info[i].read_buffer,chunk_info[i].size / 4,
4,&float_comparator);
#endif
#ifdef RADSORT
/*
* Call the radix sort - since radix is not an inplace sort,
* we have to let it tell us where it has put the output
*/
out = chunk_info[i].read_buffer + CHUNK_SIZE/4;
out = (FLOAT*) rad_sort(RADCAST chunk_info[i].read_buffer,
RADCAST out, chunk_info[i].size / 4);
chunk_info[i].write_buffer = out;
#endif
/*
* Let the other threads know we've finished sorting this
* chunk.
*/
pthread_mutex_lock(&chunk_mutex);
chunk_info[i].state = CHUNK_SORTED;
chunks_inuse--;
chunks_to_write++;
pthread_cond_signal(&chunk_sorted_cond);
}
}
pthread_mutex_unlock(&chunk_mutex);
}
INFO("Sorting thread, stage 1 exiting\n");
return NULL;
}
/*
* Disk thread for stage 1 - read and write chunk buffers
*/
void *disk_stage1(void *_args) {
int i;
uint32_t current_read_chunk;
struct disk_stage1_args *args = (struct disk_stage1_args*)_args;
INFO("Disk thread, stage 1 starting\n");
/*
* Initialize all of the chunk metadata
*/
chunks_to_sort = 0;
chunks_to_write = 0;
chunks_inuse = 0;
if (n_chunks > N_CHUNKBUFFERS) {
chunks_to_read = N_CHUNKBUFFERS;
} else {
chunks_to_read = n_chunks;
}
/*
* Loop looking for work to do
*/
current_read_chunk = 0;
while (chunks_to_write || chunks_to_read || chunks_to_sort ||
chunks_inuse) {
DEBUG("Disk thread loop\n");
/*
* Wait for a chunk to be ready to read or write
*/
pthread_mutex_lock(&chunk_mutex);
while (!chunks_to_read && !chunks_to_write) {
pthread_cond_wait(&chunk_sorted_cond,&chunk_mutex);
}
/*
* Go through the chunk buffers, looking for one to do work on
*/
for (i = 0; i < N_CHUNKBUFFERS; i++) {
switch (chunk_info[i].state) {
case CHUNK_EMPTY:
if (current_read_chunk < n_chunks) {
/* INFO("Reading chunk %i\n",current_read_chunk); */
/*
* Let other threads know we're reading this one
*/
chunk_info[i].state = CHUNK_INUSE;
chunks_inuse++;
if (current_read_chunk == (n_chunks - 1)) {
chunk_info[i].size = last_chunk_size;
} else {
chunk_info[i].size = CHUNK_SIZE;
}
chunk_info[i].number = current_read_chunk++;
pthread_mutex_unlock(&chunk_mutex);
DEBUG("Set size to %i\n",chunk_info[i].size);
/*
* Do the actual read, using our helper that does
* optimal size block IO
*/
read_data(args->infd,chunk_info[i].read_buffer,
chunk_info[i].size,0,0);
DEBUG("Chunk read finished\n");
/*
* Let sorting threads know they can now sort this
* chunk
*/
pthread_mutex_lock(&chunk_mutex);
chunk_info[i].state = CHUNK_FULL;
chunks_inuse--;
chunks_to_read--;
chunks_to_sort++;
pthread_cond_signal(&chunk_read_cond);
} else {
/*
* We've read the last chunk
*/
chunks_to_read = 0;
}
break;
case CHUNK_SORTED:
/*
* Let other threads know we're writing this chunk
*/
chunk_info[i].state = CHUNK_INUSE;
chunks_inuse++;
pthread_mutex_unlock(&chunk_mutex);
INFO("Writing chunk %i (stage1 %.2f%% done)\n",
chunk_info[i].number,
chunk_info[i].number*100.0 / n_chunks);
/*
* Do the actual write
*/
write_data(args->tmpfd,chunk_info[i].write_buffer,
chunk_info[i].size,1,
CHUNK_SIZE * chunk_info[i].number);
/*
* We'll find out on the next pass through the loop that
* this chunk is ready to read
*/
pthread_mutex_lock(&chunk_mutex);
chunks_to_write--;
chunks_to_read++;
chunks_inuse--;
chunk_info[i].state = CHUNK_EMPTY;
break;
default:
; /* Nothing to do */
}
}
pthread_mutex_unlock(&chunk_mutex);
}
INFO("Disk thread, stage 1 exiting\n");
/*
* Flag that the stage is done, and wake up the sorter threads so they'll
* notice.
*/
pthread_mutex_lock(&chunk_mutex);
stage1_done = 1;
pthread_cond_broadcast(&chunk_read_cond);
pthread_mutex_unlock(&chunk_mutex);
return NULL;
}
/* -------------------------------------------------------------------------
* Stage 2
* ------------------------------------------------------------------------- */
/*
* This messy stuff is used to keep track of all of the things we're doing with
* the write buffers
*/
int buffers_to_write;
int buffers_inuse;
int buffers_empty;
int buffers_pending;
int stacks_to_load;
/*
* Used to tell the disk thread we're done with this stage
*/
int stage2_done;
/*
* Place where the heap entries reside
*/
struct heap_entry *entries;
/*
* Keep track of how much we've written for informational purposes
*/
unsigned long total_written = 0;
/*
* Stage 2 - Do a parallel merge of all chunks
*/
void stage2(int tmpfd, int outfd) {
struct heap_entry **heap;
uint32_t heap_size;
int i;
FLOAT *stacks;
int current_write_buffer;
FLOAT *outbuf;
pthread_t disk_thread;
struct disk_stage2_args disk_args;
size_t disk_offset;
/*
* Do the memory layout for stage 1
*/
/* Stack entries go at the bottom */
entries = (struct heap_entry*)scratch_bottom;
/* Leave space for n_chunks heap entries, then the heap starts */
heap = (struct heap_entry**)((struct heap_entry*)entries + n_chunks);
/* Leave space for a heap of size heap_size, then start the write buffers */
write_buffers[0].buffer = (FLOAT*)((struct heap_entry**)heap + n_chunks);
/* Pack in all of the write buffers */
for (i = 1; i < N_WRITE_BUFFERS; i++) {
write_buffers[i].buffer =
(FLOAT*)((void*)write_buffers[i-1].buffer + STAGE2_OUTBUF_SIZE);
}
/* Stacks start after the end of the write buffers */
stacks = (FLOAT*)((void*)write_buffers[N_WRITE_BUFFERS - 1].buffer
+ STAGE2_OUTBUF_SIZE);
/*
* Figure out what size stacks we can fit in the space allocated to them
*/
stack_size = (scratch_top - (void*)stacks) / n_chunks;
#ifdef ROUND_SIZES
stack_size = round_down(stack_size, io_blocksize);
#endif
INFO("%i bytes left in memory, using stacks of size %i\n",
(scratch_top - (void*)stacks), stack_size);
INFO("entries: %p, heap: %p, stacks: %p\n",
entries,heap,stacks);
/*
* Read in the intial contents of all of the stacks
*/
heap_size = 0;
load_stacks(tmpfd,entries,stacks);
for (i = 0; i < n_chunks; i++) {
heap_insert(heap,heap_size,entries + i);
heap_size++;
}
/*
* Set up thread state
*/
pthread_cond_init(&buffer_io_cond,NULL);
pthread_cond_init(&buffer_io_done_cond,NULL);
pthread_mutex_init(&buffer_mutex,NULL);
stacks_to_load = 0;
stage2_done = 0;
/*
* Write buffers start empty
*/
for (i = 0; i < N_WRITE_BUFFERS; i++) {
write_buffers[i].size = 0;
write_buffers[i].state = BUFFER_EMPTY;
}
buffers_empty = N_WRITE_BUFFERS;
/*
* Grab the first write buffer to write sorted output into
*/
current_write_buffer = 0;
outbuf = write_buffers[current_write_buffer].buffer;
write_buffers[current_write_buffer].state = BUFFER_INUSE;
write_buffers[current_write_buffer].offset = 0;
buffers_inuse = 1;
buffers_empty--;
/*
* Start writing at the beginning of the output file
*/
disk_offset = 0;
/*
* Fork off a disk thread
*/
disk_args.tmpfd = tmpfd;
disk_args.outfd = outfd;
if (pthread_create(&disk_thread,NULL,disk_stage2,&disk_args)) {
perror("Creating disk thread");
exit(-1);
}
/*
* Keep going until we've emptied the heap
*/
while (heap_size > 0) {
DEBUG("Heap size: %i\n",heap_size);
/*
* If we've filled up an output buffer, tell the disk thread to
* flush it, and grab another write buffer
*/
if (write_buffers[current_write_buffer].size == STAGE2_OUTBUF_SIZE) {
DEBUG("Flushing %i to disk - %i\n",current_write_buffer,
write_buffers[current_write_buffer].size);
/*
* Tell the disk thread we're ready to flush
*/
pthread_mutex_lock(&buffer_mutex);
write_buffers[current_write_buffer].state = BUFFER_FULL;
buffers_inuse--;
buffers_to_write++;
pthread_cond_signal(&buffer_io_cond);
disk_offset += write_buffers[current_write_buffer].size;
/* Unlock so that the disk thread gets to run */
pthread_mutex_unlock(&buffer_mutex);
/*
* Find a new write buffer to use
*/
pthread_mutex_lock(&buffer_mutex);
/* Maybe we've gotten ahead of the disk thread */
while (!buffers_empty) {
/* INFO("Waiting for a free buffer\n"); */
pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex);
}
/*
* Find a new write buffer to use
*/
for (i = 0; i < N_WRITE_BUFFERS; i++) {
if (write_buffers[i].state == BUFFER_EMPTY) {
write_buffers[i].state = BUFFER_INUSE;
write_buffers[i].size = 0;
write_buffers[i].offset = disk_offset;
buffers_empty--;
buffers_inuse++;
current_write_buffer = i;
break;
}
}
DEBUG("Picked buffer %i\n",current_write_buffer);
pthread_mutex_unlock(&buffer_mutex);
outbuf = write_buffers[current_write_buffer].buffer;
}
DEBUG("heap: %p, cs: %p\n",heap,outbuf);
/*
* Put the top of the heap into the output buffer
*/
*outbuf = HEAP(0);
(write_buffers[current_write_buffer].size) += 4;
outbuf++;
/*
* Pop the top stack in the heap
*/
(heap[0]->stack_current)++;
heap[0]->remaining--;
DEBUG("remaining: %i\n",heap[0]->remaining);
/*
* This stack is empty - decide what to do with it
*/
if (heap[0]->remaining == 0) {
/*
* If this stack was the last in its chunk, just pop it off the
* stack
*/
if (heap[0]->end_of_chunk) {
DEBUG("popping\n");
heap_pop(heap,heap_size);
heap_size--;
} else {
/*
* Otherwise, we have to reload the stack
*/
DEBUG("load\n");
#ifdef STAGE2_LOAD_SYNC
load_stack(tmpfd,heap[0]);
#else
/*
* Tell the disk tread we've got a stack for it to read
*/
pthread_mutex_lock(&buffer_mutex);
stacks_to_load++;
pthread_cond_signal(&buffer_io_cond);
/*
* Wait for it to finish
*/
while (!heap[0]->remaining) {
pthread_cond_wait(&buffer_io_done_cond,&buffer_mutex);
}
pthread_mutex_unlock(&buffer_mutex);
#endif
/*
* Re-heapify, since the stack has been reloaded
*/
min_heapify_down(heap,heap_size,0);
}
} else { /* if (heap[0]->remaining == 0) */
/*
* Re-heapify in case the pop made this stack no longer the
* smallest.
*/
min_heapify_down(heap,heap_size,0);
}
} /* while (heap_size > 0) */
/*
* Okay, we're done merging, but we have to tell the disk thread to
* flush the last write buffer we used
*/
INFO("Flushing to disk\n");
pthread_mutex_lock(&buffer_mutex);
write_buffers[current_write_buffer].state = BUFFER_FULL;
buffers_inuse--;
buffers_to_write++;
pthread_cond_signal(&buffer_io_cond);
stage2_done = 1;
/*
* Unlock so that the disk thread gets to run
*/
pthread_mutex_unlock(&buffer_mutex);
/*
* Wait for the disk thread to finish
*/
pthread_join(disk_thread,NULL);
}
/*
* Disk thread for stage 2 - write out write buffers and read in stacks
*/
void *disk_stage2(void *_args) {
struct disk_stage2_args *args = (struct disk_stage2_args*)_args;
int i;
INFO("Disk thread, stage 2 starting\n");
/*
* As long as there's some work to do, stay in this loop
*/
while (!stage2_done || buffers_pending || buffers_to_write) {
DEBUG("Disk thread here: %i %i %i %i\n",stage2_done,buffers_pending,
buffers_to_write,stacks_to_load);
pthread_mutex_lock(&buffer_mutex);
/*
* Wait for actual work to do
*/
while (!buffers_to_write && !stacks_to_load) {
DEBUG("Disk thread waiting\n");
pthread_cond_wait(&buffer_io_cond,&buffer_mutex);
}
/*
* Look for buffers to write
*/
for (i = 0; i < N_WRITE_BUFFERS; i++) {
if (write_buffers[i].state == BUFFER_FULL) {
/*
* Mark it as ours
*/
write_buffers[i].state = BUFFER_INUSE;
buffers_to_write--;
buffers_pending++;
pthread_mutex_unlock(&buffer_mutex);
DEBUG("Writing data in buffer %i (size %i)\n",i,
write_buffers[i].size);
/*
* Do the actual write
*/
write_data(args->outfd,write_buffers[i].buffer,
write_buffers[i].size,1,write_buffers[i].offset);
total_written += write_buffers[i].size;
#ifdef DEVELOPING
/*
* Every once in a while, let the user know how we're doing
*/
if (total_written % (STAGE2_OUTBUF_SIZE * 10) == 0) {
INFO("Sorted bytes written: %lu (stage2 %.2f%% done)\n",
total_written, total_written * 100.0 / file_size);
}
#endif
DEBUG("Write done\n");
/*
* Toss this buffer back to the sorting thread
*/
pthread_mutex_lock(&buffer_mutex);
buffers_pending--;
buffers_empty++;
write_buffers[i].state = BUFFER_EMPTY;
pthread_cond_signal(&buffer_io_done_cond);
}
}
pthread_mutex_unlock(&buffer_mutex);
#ifndef STAGE2_LOAD_SYNC
/*
* Looks for stacks that need reloading - this is a slow way to do
* this, but I don't want to worry about a more complicated data
* structure right now.
*/
for (i = 0; i < n_chunks; i++) {
/* if (entries[i].remaining == 0) { */
if (entries[i].remaining <= 0) {
/*
* Load the stack
*/
load_stack(args->tmpfd,&(entries[i]));
/*
* Tell the sorting thread we're done
*/
pthread_mutex_lock(&buffer_mutex);
stacks_to_load--;
pthread_mutex_unlock(&buffer_mutex);
pthread_cond_signal(&buffer_io_done_cond);
}
}
#endif
}
INFO("Disk thread, stage 2 exiting\n");
return NULL;
}
/*
* Initialize the metadata for each stack, and load it from disk
*/
void load_stacks(int tmpfd, struct heap_entry *entries, FLOAT *stacks) {
int i;
FLOAT *stack_start;
for (i = 0; i < n_chunks; i++) {
entries[i].chunk_number = i;
stack_start = (FLOAT*)((void*)stacks + stack_size * i);
entries[i].stack_top = entries[i].stack_current = stack_start;
entries[i].next_start_pos = 0;
entries[i].end_of_chunk = 0;
load_stack(tmpfd, entries + i);
}
}
/*
* Load a single stack
*/
void load_stack(int tmpfd, struct heap_entry *entry) {
size_t size;
size_t chunk_size;
size_t remaining;
DEBUG("Loading stack from chunk %i\n",entry->chunk_number);
/*
* Figure out how much of the chunk there is left to read
*/
if (entry->chunk_number == (n_chunks - 1)) {
chunk_size = last_chunk_size;
} else {
chunk_size = CHUNK_SIZE;
}
size = stack_size;
remaining = chunk_size - (entry->next_start_pos * 4);
DEBUG("fbr: %i, cs: %i, nsp: %li\n",remaining, chunk_size,
(unsigned long int)entry->next_start_pos);
/*
* If we're reading the end of the chunk, mark this in the header
*/
if (remaining <= stack_size) {
size = remaining;
entry->end_of_chunk = 1;
DEBUG("End of chunk\n");
}
/*
* Do the read
*/
read_data(tmpfd,entry->stack_top,size,1,(CHUNK_SIZE * entry->chunk_number) +
(entry->next_start_pos * 4));
/* Convert from bytes to floats */
entry->remaining = (size >> 2);
DEBUG("set remaining to %i\n",entry->remaining);
/*
* Remember the next place to start reading from
*/
entry->next_start_pos = entry->next_start_pos + (size >> 2);
/*
* Start the stack at the top
*/
entry->stack_current = entry->stack_top;
}
/* -------------------------------------------------------------------------
* Heap implementation
* ------------------------------------------------------------------------- */
/*
* Convenience macros
*/
#define PARENT(x) ((x-1) >> 1)
#define LCHILD(x) (((x) << 1) + 1)
#define RCHILD(x) (((x) << 1) + 2)
/*
* From a given starting position, bubble the heap entry up to its propper
* location
*/
void min_heapify_up(struct heap_entry **heap, const uint32_t start) {
int i = start;
struct heap_entry *parent;
struct heap_entry *this;
this = heap[i];
parent = heap[PARENT(i)];
/*
* Loop until we're the top element, or the parent element is
* small (as its suppsed to be)
*/
while ((i > 0) &&
FLOAT_LT(*(this->stack_current),*(parent->stack_current))) {
/*
* Exchange with parent
*/
heap[PARENT(i)] = this;
heap[i] = parent;
i = PARENT(i);
this = heap[i];
if (i > 0) {
parent = heap[PARENT(i)];
}
}
}
/*
* From a given starting position, sink the heap entry down to its propper
* location
*/
void min_heapify_down(struct heap_entry **heap, const uint32_t size,
const uint32_t start) {
int i = start;
register uint32_t lchild, rchild, smallest;
register struct heap_entry *tmp;
/*
* More complicated exit condition
*/
while (1) {
lchild = LCHILD(i);
rchild = RCHILD(i);
/*
* Figure out which of the three (us, and our children) is the smallest
*/
smallest = i;
if ((lchild < size) && (HEAP(lchild) < HEAP(i))) {
smallest = lchild;
}
if ((rchild < size) && (HEAP(rchild) < HEAP(smallest))) {
smallest = rchild;
}
/*
* If it's not us, swap with the smallest - otherwise, we're done
*/
if (smallest != i) {
tmp = heap[i];
heap[i] = heap[smallest];
heap[smallest] = tmp;
i = smallest;
} else {
break;
}
}
}
/*
* Pop an element off the heap - caller must decrement heap size
*/
void heap_pop(struct heap_entry **heap, uint32_t size) {
DEBUG("Moving heap element at %i\n",size -1);
DEBUG("Remaining is %i\n",heap[size -1]->remaining);
heap[0] = heap[size -1];
min_heapify_down(heap,size -1,0);
}
/*
* Insert an element into the heap - caller must increment heap size
*/
void heap_insert(struct heap_entry **heap, uint32_t size,
struct heap_entry *item) {
heap[size] = item;
min_heapify_up(heap,size);
}
/* -------------------------------------------------------------------------
* Radix sort implementation
* ------------------------------------------------------------------------- */
#define RADIX_SIZE 8
#define RADIX_BUCKETS (256)
#define RADIX_PASSES 4
/*
* For larger radix - broken
*/
/*
#define RADIX_SIZE 11
#define RADIX_BUCKETS (256 << 3)
#define RADIX_PASSES 3
*/
uint32_t *rad_sort(uint32_t *in, uint32_t *out, int count) {
uint32_t buckets[RADIX_BUCKETS];
uint32_t offsets[RADIX_BUCKETS];
uint32_t bitmask;
int i;
int pass;
DEBUG("rad_sort in = %p, out = %p\n",in, out);
/*
* Flip with Michael Herf's algorithm
*/
#ifdef FLOATFLIP
for (i = 0; i <= count; i++) {
in[i] = in[i] ^ (-(int32_t)(in[i] >> 31) | 0x80000000);
}
#endif
/*
* Go through the passes
*/
for (pass = 0; pass < RADIX_PASSES; pass++) {
bitmask = 0;
for (i = 0; i < RADIX_SIZE; i++) {
bitmask = (bitmask << 1) | 0x01;
}
bitmask = bitmask << (pass * RADIX_SIZE);
DEBUG("Bitmask for pass %i is 0x%08x\n", pass, bitmask);
/*
* Clear out the buckets
*/
bzero(buckets,RADIX_BUCKETS * sizeof(uint32_t));
bzero(offsets,RADIX_BUCKETS * sizeof(uint32_t));
/*
* First stage, fill the buckets
*/
for (i = 0; i < count; i++) {
uint32_t tmp = in[i];
unsigned char rad = (tmp & bitmask) >> (pass * RADIX_SIZE);
buckets[rad]++;
}
/*
* Second stage, make the offset table
*/
for (i = 1; i < RADIX_BUCKETS; i++) {
offsets[i] = offsets[i - 1] + buckets[i - 1];
}
/*
* Finally, copy into the output buffer
*/
for (i = 0; i < count; i++) {
unsigned char rad;
rad = ((uint32_t)in[i] & bitmask) >> (pass * RADIX_SIZE);
out[offsets[rad]++] = in[i];
}
/*
* Flip the input and output buffers for the next pass
*/
uint32_t* tmp = out;
out = in;
in = tmp;
}
/*
* Flip back
*/
#ifdef FLOATFLIP
for (i = 0; i <= count; i++) {
in[i] = in[i] ^ (((in[i] >> 31) - 1) | 0x80000000);
}
#endif
return in;
}
/* -------------------------------------------------------------------------
* Disk IO helper functions
* ------------------------------------------------------------------------- */
/*
* Read data into a buffer, using the optimal blocksize
*/
inline void read_data(int infd, FLOAT *buffer, uint32_t length, int seek,
off_t file_offset) {
int i;
/*
* Seek if they asked us to
*/
if (seek != 0) {
if (lseek(infd,file_offset,SEEK_SET) == -1) {
perror("Error seeking in input file");
exit(-1);
}
}
#ifdef FS_BLOCKSIZE_IO
/*
* Load in all blocks we can load at full blocksize
*/
for (i = 0; i < (length / io_blocksize); i++) {
read(infd,buffer,io_blocksize);
buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize);
}
/*
* Load the last block
*/
read(infd, buffer, length % io_blocksize);
#else
/*
* Just slurp the whole thing in one read()
*/
read(infd,buffer,length);
#endif
}
/*
* Read data into a buffer, using the optimal blocksize
*/
inline void write_data(int outfd, FLOAT *buffer, uint32_t length,
int seek, off_t file_offset) {
int i;
/*
* Seek if they asked us to
*/
if (seek != 0) {
DEBUG("Seeking to %li\n", (unsigned long int)file_offset);
if (lseek(outfd,file_offset,SEEK_SET) == -1) {
perror("Error seeking in output file");
exit(-1);
}
}
#ifdef FS_BLOCKSIZE_IO
/*
* Write all blocks we can load at full blocksize
*/
for (i = 0; i < (length / io_blocksize); i++) {
write(outfd,buffer,io_blocksize);
buffer = (FLOAT*)((unsigned char *)buffer + io_blocksize);
}
/*
* Write the last block
*/
write(outfd, buffer, length % io_blocksize);
#else
/*
* Just write the whole thing in one write()
*/
write(outfd,buffer,length);
#endif
}
/* -------------------------------------------------------------------------
* Other helpers
* ------------------------------------------------------------------------- */
/*
* Round a to the nearest b - round down, but never to 0
*/
int round_down(int a, int b) {
int rounded;
rounded = (a / b) * b;
if (rounded > 0) {
return rounded;
} else {
return a;
}
}