clean up unused method


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2930 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
deleted file mode 100644
index dee3a72..0000000
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
+++ /dev/null
@@ -1,179 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.io.DataInput;

-import java.io.DataOutput;

-import java.io.IOException;

-import java.util.List;

-

-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.hyracks.api.comm.IFrameReader;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.ActivityId;

-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;

-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

-import edu.uci.ics.hyracks.api.dataflow.TaskId;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

-import edu.uci.ics.hyracks.api.job.JobId;

-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunMerger;

-import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorter;

-

-public class GenKmerDescriptor extends AbstractOperatorDescriptor {

-

-	private static final long serialVersionUID = 1L;

-	private static final int SPLIT_ACTIVITY_ID = 0;

-	private static final int MERGE_ACTIVITY_ID = 1;

-	private final int framesLimit;

-	private final int k;

-

-	public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit,

-			int k) {

-		super(spec, 1, 1);

-		this.framesLimit = framesLimit;

-		this.k = k;

-

-		// TODO Auto-generated constructor stub

-		recordDescriptors[0] = new RecordDescriptor(

-				new ISerializerDeserializer[] {

-						Integer64SerializerDeserializer.INSTANCE,

-						ByteSerializerDeserializer.INSTANCE,

-						IntegerSerializerDeserializer.INSTANCE });

-	}

-

-	@Override

-	public void contributeActivities(IActivityGraphBuilder builder) {

-		// TODO Auto-generated method stub

-		SplitActivity sa = new SplitActivity(new ActivityId(odId,

-				SPLIT_ACTIVITY_ID));

-		MergeActivity ma = new MergeActivity(new ActivityId(odId,

-				MERGE_ACTIVITY_ID));

-		builder.addActivity(this, sa);

-		builder.addSourceEdge(0, sa, 0);

-

-		builder.addActivity(this, ma);

-		builder.addTargetEdge(0, ma, 0);

-

-		builder.addBlockingEdge(sa, ma);

-	}

-

-	private class SplitActivity extends AbstractActivityNode {

-		/**

-		 * 

-		 */

-		private static final long serialVersionUID = 1L;

-

-		public SplitActivity(ActivityId activityID) {

-			super(activityID);

-			// TODO Auto-generated constructor stub

-		}

-

-		public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,

-				IRecordDescriptorProvider recordDescProvider, int partition,

-				int nPartitions) throws HyracksDataException {

-			// TODO Auto-generated method stub

-			// IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int

-			// buffer_size

-			KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(

-					ctx,

-					k,

-					new RecordDescriptor(

-							new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }),

-					framesLimit, new TaskId(this.id, partition));

-			return op;

-		}

-	}

-

-	public static class SplitTaskState extends AbstractStateObject {

-		List<IFrameReader> runs;

-

-		public SplitTaskState() {

-		}

-

-		public SplitTaskState(JobId jobId, TaskId taskId,

-				List<IFrameReader> runs) {

-			super(jobId, taskId);

-			this.runs = runs;

-		}

-

-		@Override

-		public void toBytes(DataOutput out) throws IOException {

-

-		}

-

-		@Override

-		public void fromBytes(DataInput in) throws IOException {

-

-		}

-	}

-

-	private class MergeActivity extends AbstractActivityNode {

-

-		/**

-		 * 

-		 */

-		private static final long serialVersionUID = 1L;

-

-		public MergeActivity(ActivityId id) {

-			super(id);

-			// TODO Auto-generated constructor stub

-		}

-

-		@Override

-		public IOperatorNodePushable createPushRuntime(

-				final IHyracksTaskContext ctx,

-				IRecordDescriptorProvider recordDescProvider,

-				final int partition, int nPartitions)

-				throws HyracksDataException {

-			// TODO Auto-generated method stub

-			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {

-				@Override

-				public void initialize() throws HyracksDataException {

-					SplitTaskState state = (SplitTaskState) ctx

-							.getStateObject(new TaskId(new ActivityId(

-									getOperatorId(), SPLIT_ACTIVITY_ID),

-									partition));

-					// List<IFrameReader> runs = runs = new

-					// LinkedList<IFrameReader>();;

-

-					IBinaryComparator[] comparators = new IBinaryComparator[1];

-					IBinaryComparatorFactory cf = PointableBinaryComparatorFactory

-							.of(LongPointable.FACTORY);

-					comparators[0] = cf.createBinaryComparator();

-

-					// int necessaryFrames = Math.min(runs.size() + 2,

-					// framesLimit);

-

-					FrameSorter frameSorter = new FrameSorter(

-							ctx,

-							new int[] { 0 },

-							new Integer64NormalizedKeyComputerFactory(),

-							new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-									.of(LongPointable.FACTORY) },

-							recordDescriptors[0]);

-

-					ExternalSortRunMerger merger = new ExternalSortRunMerger(

-							ctx, frameSorter, state.runs, new int[] { 0 },

-							comparators, recordDescriptors[0], framesLimit,

-							writer);

-					merger.process();

-				}

-			};

-			return op;

-		}

-	}

-}

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
deleted file mode 100644
index efe054b..0000000
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
+++ /dev/null
@@ -1,259 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.nio.ByteBuffer;

-import java.util.List;

-

-import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.dataflow.GenKmerDescriptor.SplitTaskState;

-import edu.uci.ics.hyracks.api.comm.IFrameReader;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.TaskId;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

-

-public class KmerSplitOperatorNodePushable extends

-		AbstractUnaryInputSinkOperatorNodePushable {

-

-	private final int k;

-	private long window;

-

-	private final SplitFrame frameSorter;

-

-	private FrameTupleAccessor accessor;

-	private ArrayTupleBuilder tupleBuilder;

-	private TaskId MytaskId;

-	private IHyracksTaskContext ctx;

-

-	public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k,

-			RecordDescriptor rd_in, int buffer_size, TaskId taskid) {

-

-		tupleBuilder = new ArrayTupleBuilder(3);

-		this.k = k;

-

-		RecordDescriptor rd = new RecordDescriptor(

-				new ISerializerDeserializer[] {

-						Integer64SerializerDeserializer.INSTANCE,

-						ByteSerializerDeserializer.INSTANCE,

-						IntegerSerializerDeserializer.INSTANCE });

-

-		int[] sortFields = { 0 };

-		frameSorter = new SplitFrame(

-				ctx,

-				sortFields,

-				new Integer64NormalizedKeyComputerFactory(),

-				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-						.of(LongPointable.FACTORY) }, rd, buffer_size);

-

-		accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);

-

-		new FrameTupleAccessor(ctx.getFrameSize(), rd);

-		new FrameTupleAppender(ctx.getFrameSize());

-

-		ByteBuffer.allocate(ctx.getFrameSize());

-

-		// initialize the window

-		window = 0;

-		for (int i = 0; i < k; i++) {

-			window <<= 2;

-			window |= 3;

-		}

-

-		MytaskId = taskid;

-		this.ctx = ctx;

-	}

-

-	@Override

-	public void open() throws HyracksDataException {

-		// TODO Auto-generated method stub

-		// writer.open();

-

-	}

-

-	@Override

-	public void nextFrame(ByteBuffer buffer) {

-		accessor.reset(buffer);

-		int tupleCount = accessor.getTupleCount();

-		ByteBuffer temp_buf = accessor.getBuffer();

-		for (int i = 0; i < tupleCount; i++) {

-			int tupleStartOffset = accessor.getTupleStartOffset(i);

-			int fieldStartOffset = accessor.getFieldStartOffset(i, 0);

-			int loadLength = accessor.getFieldLength(i, 0);

-			// int loadLength = temp_buf.getInt(tupleStartOffset);

-			byte[] read = new byte[loadLength];

-			int slotLength = accessor.getFieldSlotsLength();

-			// temp_buf.position(tupleStartOffset+fieldStartOffset +

-			// accessor.getFieldSlotsLength());

-			int pos = tupleStartOffset + fieldStartOffset + slotLength;

-			// temp_buf

-			try {

-				temp_buf.position(pos);

-				temp_buf.get(read, 0, loadLength);

-				SplitReads(read);

-			} catch (Exception e) {

-				// TODO Auto-generated catch block

-				e.printStackTrace();

-			}

-

-			tupleBuilder.reset();

-

-		}

-	}

-

-	private long CompressKmer(byte[] array, int start, int k) {

-		// a: 00; c: 01; G: 10; T: 11

-		long l = 0;

-		for (int i = start; i < start + k; i++) {

-			l <<= 2;

-			switch (array[start + i]) {

-			case 'A':

-			case 'a':

-				l |= 0;

-				break;

-			case 'C':

-			case 'c':

-				l |= 1;

-				break;

-			case 'G':

-			case 'g':

-				l |= 2;

-				break;

-			case 'T':

-			case 't':

-				l |= 3;

-				break;

-			}

-		}

-		return l;

-	}

-

-	private byte GetBitmap(byte t) {

-		byte r = 0;

-		switch (t) {

-		case 'A':

-		case 'a':

-			r = 1;

-			break;

-		case 'C':

-		case 'c':

-			r = 2;

-			break;

-		case 'G':

-		case 'g':

-			r = 4;

-			break;

-		case 'T':

-		case 't':

-			r = 8;

-			break;

-		}

-		return r;

-	}

-

-	private byte ConvertSymbol(byte t) {

-		byte r = 0;

-		switch (t) {

-		case 'A':

-		case 'a':

-			r = 0;

-			break;

-		case 'C':

-		case 'c':

-			r = 1;

-			break;

-		case 'G':

-		case 'g':

-			r = 2;

-			break;

-		case 'T':

-		case 't':

-			r = 3;

-			break;

-		}

-		return r;

-	}

-

-	private void SplitReads(byte[] array) {

-		try {

-			long l = 0;

-

-			byte pre = 0, next = 0;

-			byte r;

-

-			for (int i = 2; i < array.length - k + 1; i++) {

-				if (2 == i) {

-					l = CompressKmer(array, i, k);

-				} else {

-					l <<= 2;

-					l &= window;

-					l |= ConvertSymbol(array[i + k - 1]);

-					pre = GetBitmap(array[i - 1]);

-				}

-				if (i + k != array.length) {

-					next = GetBitmap(array[i + k]);

-				}

-

-				r = 0;

-				r |= pre;

-				r <<= 4;

-				r |= next;

-

-				/*

-				 * System.out.print(l); System.out.print(' ');

-				 * System.out.print(r); System.out.println();

-				 */

-

-				frameSorter.insertKmer(l, r);

-			}

-		} catch (Exception e) {

-			e.printStackTrace();

-		}

-	}

-

-	@Override

-	public void fail() throws HyracksDataException {

-		// TODO Auto-generated method stub

-

-	}

-

-	@Override

-	public void close() {

-		// TODO Auto-generated method stub

-		try {

-			frameSorter.processLastFrame();

-			SplitTaskState state = new SplitTaskState(ctx.getJobletContext()

-					.getJobId(), MytaskId, frameSorter.GetRuns());

-			ctx.setStateObject(state);

-

-			// writer.close();

-		} catch (Exception e) {

-			// TODO Auto-generated catch block

-			e.printStackTrace();

-		}

-	}

-

-	public List<IFrameReader> GetRuns() {

-		return frameSorter.GetRuns();

-	}

-

-	// for debug

-	/*

-	 * private void DumpBlock(ByteBuffer f){

-	 * 

-	 * int n = f.array().length/13;

-	 * 

-	 * for(int i = 0 ; i < n ; i++){ long t = LongPointable.getLong(f.array(),

-	 * 13 * i); System.out.print(t); System.out.print(' '); }

-	 * System.out.println(); }

-	 */

-}

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
deleted file mode 100644
index 0c8bd07..0000000
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/*

- * Copyright 2009-2010 by The Regents of the University of California

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * you may obtain a copy of the License from

- * 

- *     http://www.apache.org/licenses/LICENSE-2.0

- * 

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-package edu.uci.ics.genomix.dataflow;

-

-import java.nio.ByteBuffer;

-import java.util.LinkedList;

-import java.util.List;

-

-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.hyracks.api.comm.IFrameReader;

-import edu.uci.ics.hyracks.api.comm.IFrameWriter;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.io.FileReference;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;

-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortRunGenerator;

-import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;

-import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;

-

-public class SplitFrame {

-

-	private static int HASH_SIZE = 4096;

-	private final SerializableHashTable table;

-	private final TuplePointer tempPointer;

-	private ArrayTupleBuilder tupleBuilder;

-	private final int buf_size;

-

-	private final IHyracksTaskContext ctx;

-	private final int[] sortFields;

-	private final INormalizedKeyComputer nkc;

-	private final IBinaryComparator[] comparators;

-	private final ByteBuffer[] buffers;

-

-	private final FrameTupleAccessor fta1;

-	private final FrameTupleAccessor fta2;

-

-	private final FrameTupleAppender appender;

-

-	private final ByteBuffer outFrame;

-

-	private int dataFrameCount;

-	private int[] tPointers;

-	private int tupleCount;

-	private final List<IFrameReader> runs;

-	private int flushCount;

-	private RecordDescriptor recordDescriptor;

-

-	private int FrameTupleCount;

-

-	public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,

-			INormalizedKeyComputerFactory firstKeyNormalizerFactory,

-			IBinaryComparatorFactory[] comparatorFactories,

-			RecordDescriptor recordDescriptor, int buf_size) {

-		this.ctx = ctx;

-		this.sortFields = sortFields;

-		this.recordDescriptor = recordDescriptor;

-

-		nkc = firstKeyNormalizerFactory == null ? null

-				: firstKeyNormalizerFactory.createNormalizedKeyComputer();

-		comparators = new IBinaryComparator[comparatorFactories.length];

-		for (int i = 0; i < comparatorFactories.length; ++i) {

-			comparators[i] = comparatorFactories[i].createBinaryComparator();

-		}

-		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

-		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

-		appender = new FrameTupleAppender(ctx.getFrameSize());

-		outFrame = ctx.allocateFrame();

-		table = new SerializableHashTable(HASH_SIZE, ctx);

-		dataFrameCount = 0;

-

-		tempPointer = new TuplePointer();

-		tupleBuilder = new ArrayTupleBuilder(3);

-		this.buf_size = buf_size;

-		buffers = new ByteBuffer[buf_size];

-		for (int i = 0; i < buf_size; i++) {

-			buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());

-		}

-		appender.reset(buffers[0], true);

-		flushCount = 0;

-		runs = new LinkedList<IFrameReader>();

-		FrameTupleCount = 0;

-	}

-

-	public void reset() {

-		dataFrameCount = 0;

-		tupleCount = 0;

-		appender.reset(buffers[0], true);

-	}

-

-	public int getFrameCount() {

-		return dataFrameCount;

-	}

-

-	private void SearchHashTable(long entry, TuplePointer dataPointer) {

-		int offset = 0;

-		int tp = (int) (entry % HASH_SIZE);

-		if (tp < 0) {

-			tp = -tp;

-		}

-		do {

-			table.getTuplePointer(tp, offset, dataPointer);// what is the offset

-															// mean?

-			if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)

-				break;

-			int bIndex = dataPointer.frameIndex;

-			int tIndex = dataPointer.tupleIndex;

-			fta1.reset(buffers[bIndex]);

-

-			/*

-			 * System.out.print("a:"); System.out.print(tIndex);

-			 * System.out.print(" b"); System.out.print(fta1.getTupleCount());

-			 * System.out.println();

-			 */

-

-			int tupleOffset = fta1.getTupleStartOffset(tIndex);

-			int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);

-			int slotLength = fta1.getFieldSlotsLength();

-			int pos = tupleOffset + fieldOffset + slotLength;

-			long l = buffers[bIndex].getLong(pos);

-			if (l == entry) {

-				break;

-			}

-			offset += 1;

-		} while (true);

-	}

-

-	private void InsertHashTable(long entry, int frame_id, int tuple_id) {

-

-		tempPointer.frameIndex = frame_id;

-		tempPointer.tupleIndex = tuple_id;

-

-		// System.out.print(frame_id);

-		// System.out.print(' ');

-		// System.out.println(tuple_id);

-

-		int tp = (int) (entry % HASH_SIZE);

-		if (tp < 0) {

-			tp = -tp;

-		}

-		table.insert(tp, tempPointer);

-

-	}

-

-	public void insertKmer(long l, byte r) {

-		try {

-			SearchHashTable(l, tempPointer);

-			if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {

-				fta1.reset(buffers[tempPointer.frameIndex]);

-				int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);

-				int f0StartRel = fta1.getFieldStartOffset(

-						tempPointer.tupleIndex, 1);

-				int slotLength = fta1.getFieldSlotsLength();

-				int pos = f0StartRel + tStart + slotLength;

-

-				buffers[tempPointer.frameIndex].array()[pos] |= r;

-				buffers[tempPointer.frameIndex].position(pos + 1);

-				int temp_int = buffers[tempPointer.frameIndex].getInt();

-				temp_int += 1;

-				buffers[tempPointer.frameIndex].position(pos + 1);

-				buffers[tempPointer.frameIndex].putInt(temp_int);

-			} else {

-				tupleBuilder.reset();

-				tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

-						l);

-				tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

-				tupleBuilder

-						.addField(IntegerSerializerDeserializer.INSTANCE, 1);

-

-				/*

-				 * System.out.print(l); System.out.print(' ');

-				 * System.out.print(r); System.out.println();

-				 */

-				boolean b = appender.append(tupleBuilder.getFieldEndOffsets(),

-						tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());

-

-				if (!b) {

-					dataFrameCount++;

-					FrameTupleCount = 0;

-					if (dataFrameCount < buf_size) {

-						appender.reset(buffers[dataFrameCount], true);

-					} else {

-						sortFrames();

-						FileReference file = ctx.getJobletContext()

-								.createManagedWorkspaceFile(

-										ExternalSortRunGenerator.class

-												.getSimpleName());

-						RunFileWriter writer = new RunFileWriter(file,

-								ctx.getIOManager());

-						writer.open();

-						try {

-							flushCount += 1;

-							flushFrames(writer);

-						} finally {

-							writer.close();

-						}

-						runs.add(writer.createReader());

-						dataFrameCount = 0;

-						appender.reset(buffers[dataFrameCount], true);

-					}

-					boolean tb = appender.append(

-							tupleBuilder.getFieldEndOffsets(),

-							tupleBuilder.getByteArray(), 0,

-							tupleBuilder.getSize());

-					if (!tb) {

-						throw new HyracksDataException(

-								"Failed to copy an record into a frame: the record size is too large");

-					}

-				}

-				InsertHashTable(l, dataFrameCount, FrameTupleCount);

-				FrameTupleCount += 1;

-			}

-		} catch (HyracksDataException e) {

-			// TODO Auto-generated catch block

-			e.printStackTrace();

-		}

-	}

-

-	public void sortFrames() {

-		int nBuffers = dataFrameCount;

-		tupleCount = 0;

-		for (int i = 0; i < nBuffers; ++i) {

-			fta1.reset(buffers[i]);

-			tupleCount += fta1.getTupleCount();

-		}

-		int sfIdx = sortFields[0];

-		tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4]

-				: tPointers;

-		int ptr = 0;

-		for (int i = 0; i < nBuffers; ++i) {

-			fta1.reset(buffers[i]);

-			int tCount = fta1.getTupleCount();

-			byte[] array = fta1.getBuffer().array();

-			for (int j = 0; j < tCount; ++j) {

-				int tStart = fta1.getTupleStartOffset(j);

-				int tEnd = fta1.getTupleEndOffset(j);

-				tPointers[ptr * 4] = i;

-				tPointers[ptr * 4 + 1] = tStart;

-				tPointers[ptr * 4 + 2] = tEnd;

-				int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);

-				int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);

-				int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();

-				tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array,

-						f0Start, f0EndRel - f0StartRel);

-				++ptr;

-			}

-		}

-		if (tupleCount > 0) {

-			sort(tPointers, 0, tupleCount);

-		}

-

-		DumpAllBuffers();

-		// point the pointer to the first one

-		dataFrameCount = 0;

-	}

-

-	public void flushFrames(IFrameWriter writer) throws HyracksDataException {

-		appender.reset(outFrame, true);

-		for (int ptr = 0; ptr < tupleCount; ++ptr) {

-			int i = tPointers[ptr * 4];

-			int tStart = tPointers[ptr * 4 + 1];

-			int tEnd = tPointers[ptr * 4 + 2];

-			ByteBuffer buffer = buffers[i];

-			fta1.reset(buffer);

-			if (!appender.append(fta1, tStart, tEnd)) {

-				FrameUtils.flushFrame(outFrame, writer);

-				appender.reset(outFrame, true);

-				if (!appender.append(fta1, tStart, tEnd)) {

-					throw new IllegalStateException();

-				}

-			}

-		}

-		if (appender.getTupleCount() > 0) {

-			FrameUtils.flushFrame(outFrame, writer);

-		}

-	}

-

-	private void sort(int[] tPointers, int offset, int length) {

-		int m = offset + (length >> 1);

-		int mi = tPointers[m * 4];

-		int mj = tPointers[m * 4 + 1];

-		int mv = tPointers[m * 4 + 3];

-

-		int a = offset;

-		int b = a;

-		int c = offset + length - 1;

-		int d = c;

-		while (true) {

-			while (b <= c) {

-				int cmp = compare(tPointers, b, mi, mj, mv);

-				if (cmp > 0) {

-					break;

-				}

-				if (cmp == 0) {

-					swap(tPointers, a++, b);

-				}

-				++b;

-			}

-			while (c >= b) {

-				int cmp = compare(tPointers, c, mi, mj, mv);

-				if (cmp < 0) {

-					break;

-				}

-				if (cmp == 0) {

-					swap(tPointers, c, d--);

-				}

-				--c;

-			}

-			if (b > c)

-				break;

-			swap(tPointers, b++, c--);

-		}

-

-		int s;

-		int n = offset + length;

-		s = Math.min(a - offset, b - a);

-		vecswap(tPointers, offset, b - s, s);

-		s = Math.min(d - c, n - d - 1);

-		vecswap(tPointers, b, n - s, s);

-

-		if ((s = b - a) > 1) {

-			sort(tPointers, offset, s);

-		}

-		if ((s = d - c) > 1) {

-			sort(tPointers, n - s, s);

-		}

-	}

-

-	private void swap(int x[], int a, int b) {

-		for (int i = 0; i < 4; ++i) {

-			int t = x[a * 4 + i];

-			x[a * 4 + i] = x[b * 4 + i];

-			x[b * 4 + i] = t;

-		}

-	}

-

-	private void vecswap(int x[], int a, int b, int n) {

-		for (int i = 0; i < n; i++, a++, b++) {

-			swap(x, a, b);

-		}

-	}

-

-	private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {

-		int i1 = tPointers[tp1 * 4];

-		int j1 = tPointers[tp1 * 4 + 1];

-		int v1 = tPointers[tp1 * 4 + 3];

-		if (v1 != tp2v) {

-			return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1

-					: 1;

-		}

-		int i2 = tp2i;

-		int j2 = tp2j;

-		ByteBuffer buf1 = buffers[i1];

-		ByteBuffer buf2 = buffers[i2];

-		byte[] b1 = buf1.array();

-		byte[] b2 = buf2.array();

-		fta1.reset(buf1);

-		fta2.reset(buf2);

-		for (int f = 0; f < comparators.length; ++f) {

-			int fIdx = sortFields[f];

-			int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);

-			int f1End = buf1.getInt(j1 + fIdx * 4);

-			int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;

-			int l1 = f1End - f1Start;

-			int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);

-			int f2End = buf2.getInt(j2 + fIdx * 4);

-			int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;

-			int l2 = f2End - f2Start;

-			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);

-			if (c != 0) {

-				return c;

-			}

-		}

-		return 0;

-	}

-

-	public void close() {

-		// this.buffers.clear();

-	}

-

-	public int getFlushCount() {

-		return flushCount;

-	}

-

-	/*

-	 * public void AddRuns(RunFileWriter r){ try { runs.add(r.createReader()); }

-	 * catch (HyracksDataException e) { // TODO Auto-generated catch block

-	 * e.printStackTrace(); } }

-	 */

-

-	public List<IFrameReader> GetRuns() {

-		return runs;

-	}

-

-	private void DumpAllBuffers() {

-		FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(),

-				recordDescriptor);

-

-		for (int i = 0; i < dataFrameCount; i++) {

-			tfa.reset(buffers[i]);

-			int t = tfa.getTupleCount();

-			for (int j = 0; j < t; j++) {

-				int tupleOffset = tfa.getTupleStartOffset(j);

-

-				int r = tfa.getFieldStartOffset(j, 0);

-				int pos = tupleOffset + r + tfa.getFieldSlotsLength();

-				long l = buffers[i].getLong(pos);

-				System.out.print(l);

-				System.out.print(' ');

-

-				r = tfa.getFieldStartOffset(j, 1);

-				pos = tupleOffset + r + tfa.getFieldSlotsLength();

-				byte b = buffers[i].array()[pos];

-				System.out.print(b);

-				System.out.print(' ');

-

-				r = tfa.getFieldStartOffset(j, 2);

-				pos = tupleOffset + r + tfa.getFieldSlotsLength();

-				int o = buffers[i].getInt(pos);

-				System.out.print(o);

-				System.out.print(' ');

-

-				System.out.println();

-			}

-		}

-		System.out.println("---------------------------------");

-	}

-

-	// functions for dubugging

-	// private void DumpBuffer(byte[] f) {

-	// int n = f.length;

-	//

-	// int count = 0;

-	// for (int i = 0; i < n; i++) {

-	// if (i % 13 == 0) {

-	// if (count != 0) {

-	// System.out.print(")(");

-	// } else {

-	// System.out.print("(");

-	// }

-	// System.out.print(count);

-	// System.out.print(':');

-	// count += 1;

-	// }

-	// System.out.print(f[i]);

-	// System.out.print(' ');

-	// }

-	// System.out.println(')');

-	// }

-

-	public void processLastFrame() {

-		sortFrames();

-		FileReference file;

-

-		DumpAllBuffers();

-		try {

-			file = ctx.getJobletContext().createManagedWorkspaceFile(

-					ExternalSortRunGenerator.class.getSimpleName());

-			RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

-			writer.open();

-			try {

-				flushCount += 1;

-				flushFrames(writer);

-			} finally {

-				writer.close();

-			}

-			runs.add(writer.createReader());

-		} catch (HyracksDataException e) {

-			// TODO Auto-generated catch block

-			e.printStackTrace();

-		}

-		// frameSorter.AddRuns((RunFileWriter) writer);

-

-	}

-}
\ No newline at end of file