Merge branch 'fullstack_genomix' of https://code.google.com/p/hyracks into fullstack_genomix
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index 0d31998..b8dc757 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -241,6 +241,13 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>0.2.5-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
 			<artifactId>genomix-data</artifactId>
 			<version>0.2.5-SNAPSHOT</version>
 			<type>jar</type>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
new file mode 100644
index 0000000..39cc6fc
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenStatistic.java
@@ -0,0 +1,179 @@
+package edu.uci.ics.genomix.job;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.util.StatReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.util.StatSumAggregateFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenStatistic extends JobGen {
+	private int kmers;
+	private JobConf hadoopjob;
+	private RecordDescriptor readOutputRec;
+	private String[] ncNodeNames;
+	private Scheduler scheduler;
+	private RecordDescriptor combineOutputRec;
+	
+
+	public JobGenStatistic(GenomixJob job) {
+		super(job);
+		// TODO Auto-generated constructor stub
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		// TODO Auto-generated method stub
+		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+		hadoopjob = new JobConf(conf);
+		hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+	}
+
+	@Override
+	public JobSpecification generateJob() throws HyracksException {
+		// TODO Auto-generated method stub
+		int[] degreeFields = { 0 };
+		int[] countFields = { 1 };
+		JobSpecification jobSpec = new JobSpecification();
+		/** specify the record fields after read */
+		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				null, ByteSerializerDeserializer.INSTANCE,ByteSerializerDeserializer.INSTANCE });
+		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				null, ByteSerializerDeserializer.INSTANCE });
+		/** the reader */
+		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				readOperator, ncNodeNames);
+
+		/** the combiner aggregator */
+		AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(
+				jobSpec, degreeFields, readOperator);
+		AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(
+				jobSpec, countFields, readOperator);
+
+		/** the final aggregator */
+		AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(
+				jobSpec, degreeFields, degreeLocal);
+		AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(
+				jobSpec, countFields, countLocal);
+		
+		/** writer */
+		AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(
+				jobSpec, degreeFields, degreeMerger);
+		AbstractFileWriteOperatorDescriptor writeCount = connectWriter(
+				jobSpec, countFields, countMerger);
+		jobSpec.addRoot(writeDegree);
+		jobSpec.addRoot(writeCount);
+		return null;
+	}
+
+	private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
+			throws HyracksDataException {
+		try {
+
+			InputSplit[] splits = hadoopjob.getInputFormat().getSplits(
+					hadoopjob, ncNodeNames.length);
+
+			String[] readSchedule = scheduler.getLocationConstraints(splits);
+			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec,
+					hadoopjob, splits, readSchedule,
+					new StatReadsKeyValueParserFactory());
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
+	private ExternalGroupOperatorDescriptor newExternalGroupby(
+			JobSpecification jobSpec, int[] keyFields,
+			IAggregatorDescriptorFactory aggeragater) {
+		return new ExternalGroupOperatorDescriptor(jobSpec, keyFields,
+				GenomixJob.DEFAULT_FRAME_LIMIT, new IBinaryComparatorFactory[] {
+						new ByteComparatorFactory() }, null, aggeragater,
+				new StatSumAggregateFactory(),
+				combineOutputRec, new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(keyFields,
+								new IBinaryHashFunctionFactory[] {
+										new ByteComparatorFactory() }),
+						GenomixJob.DEFAULT_TABLE_SIZE), true);
+	}
+
+	private AbstractOperatorDescriptor connectLocalAggregateByField(
+			JobSpecification jobSpec, int[] fields,
+			HDFSReadOperatorDescriptor readOperator) {
+		AbstractOperatorDescriptor localAggregator = newExternalGroupby(
+				jobSpec, fields, new StatCountAggregateFactory());
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				localAggregator, ncNodeNames);
+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+				jobSpec);
+		jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+		return localAggregator;
+	}
+
+	private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec,
+			int[] fields, AbstractOperatorDescriptor localAggregator) {
+		AbstractOperatorDescriptor finalAggregator = newExternalGroupby(
+				jobSpec, fields, new StatSumAggregateFactory());
+		// only need one reducer
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				finalAggregator, ncNodeNames[fields[0] % ncNodeNames.length]);
+		IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(
+				jobSpec,
+				new ITuplePartitionComputerFactory(){
+					private static final long serialVersionUID = 1L;
+					@Override
+					public ITuplePartitionComputer createPartitioner() {
+						return new ITuplePartitionComputer(){
+							@Override
+							public int partition(IFrameTupleAccessor accessor,
+									int tIndex, int nParts)
+									throws HyracksDataException {
+								return 0;
+							}
+						};
+					}
+				},
+				fields, 
+				new IBinaryComparatorFactory[]{new ByteComparatorFactory()});
+		jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+		return finalAggregator;
+	}
+	
+	private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int [] fields, AbstractOperatorDescriptor finalAggregator){
+		LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(
+				jobSpec, null);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				writeOperator, ncNodeNames[fields[0] % ncNodeNames.length]);
+
+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
+				jobSpec);
+		jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+		return writeOperator;
+	}
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
new file mode 100644
index 0000000..b469be0
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/ByteComparatorFactory.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.genomix.util;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public IBinaryComparator createBinaryComparator() {
+		return new IBinaryComparator(){
+
+			@Override
+			public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
+					int l2) {
+				return b1[s1]-b2[s2];
+			}
+			
+		};
+	}
+
+	@Override
+	public IBinaryHashFunction createBinaryHashFunction() {
+		return new IBinaryHashFunction(){
+
+			@Override
+			public int hash(byte[] bytes, int offset, int length) {
+				return bytes[offset];
+			}
+			
+		};
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
new file mode 100644
index 0000000..49a7453
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatCountAggregateFactory.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.genomix.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatCountAggregateFactory implements
+		IAggregatorDescriptorFactory {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	public class CountAggregator implements IAggregatorDescriptor {
+
+		@Override
+		public AggregateState createAggregateStates() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		@Override
+		public void init(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			int count = 1;
+			DataOutput fieldOutput = tupleBuilder.getDataOutput();
+			try {
+				fieldOutput.writeInt(count);
+				tupleBuilder.addFieldEndOffset();
+			} catch (IOException e) {
+				throw new HyracksDataException(
+						"I/O exception when initializing the aggregator.");
+			}
+		}
+
+		@Override
+		public void reset() {
+			// TODO Auto-generated method stub
+
+		}
+
+		@Override
+		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+				AggregateState state) throws HyracksDataException {
+			int count = 1;
+
+			int statetupleOffset = stateAccessor
+					.getTupleStartOffset(stateTupleIndex);
+			int countfieldStart = stateAccessor.getFieldStartOffset(
+					stateTupleIndex, 1);
+			int countoffset = statetupleOffset
+					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+			byte[] data = stateAccessor.getBuffer().array();
+			count += IntegerSerializerDeserializer.getInt(data, countoffset);
+			IntegerSerializerDeserializer.putInt(count, data, countoffset);
+		}
+
+		@Override
+		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			int count = getCount(accessor, tIndex);
+			DataOutput fieldOutput = tupleBuilder.getDataOutput();
+			try {
+				fieldOutput.writeInt(count);
+				tupleBuilder.addFieldEndOffset();
+			} catch (IOException e) {
+				throw new HyracksDataException(
+						"I/O exception when writing aggregation to the output buffer.");
+			}
+
+		}
+
+		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+			int tupleOffset = accessor.getTupleStartOffset(tIndex);
+			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
+					+ fieldStart;
+			byte[] data = accessor.getBuffer().array();
+
+			return IntegerSerializerDeserializer.getInt(data, countoffset);
+		}
+
+		@Override
+		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			outputPartialResult(tupleBuilder, accessor, tIndex, state);
+		}
+
+		@Override
+		public void close() {
+			// TODO Auto-generated method stub
+
+		}
+
+	}
+
+	@Override
+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+			RecordDescriptor inRecordDescriptor,
+			RecordDescriptor outRecordDescriptor, int[] keyFields,
+			int[] keyFieldsInPartialResults) throws HyracksDataException {
+		// TODO Auto-generated method stub
+		return new CountAggregator();
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..4c2205f
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatReadsKeyValueParserFactory.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.genomix.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.type.KmerUtil;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<BytesWritable,KmerCountValue> {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public IKeyValueParser<BytesWritable,KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+			throws HyracksDataException {
+		
+		final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+		final ByteBuffer outputBuffer = ctx.allocateFrame();
+		final FrameTupleAppender outputAppender = new FrameTupleAppender(
+				ctx.getFrameSize());
+		outputAppender.reset(outputBuffer, true);
+		
+		return new IKeyValueParser<BytesWritable,KmerCountValue>(){
+
+			@Override
+			public void open(IFrameWriter writer) throws HyracksDataException {
+				// TODO Auto-generated method stub
+				
+			}
+
+			@Override
+			public void parse(BytesWritable key, KmerCountValue value,
+					IFrameWriter writer) throws HyracksDataException {
+				byte adjMap = value.getAdjBitMap();
+				byte count = value.getCount();
+				InsertToFrame((byte) (KmerUtil.inDegree(adjMap)*10+KmerUtil.outDegree(adjMap)),count,writer);
+			}
+
+			@Override
+			public void close(IFrameWriter writer) throws HyracksDataException {
+				FrameUtils.flushFrame(outputBuffer, writer);
+			}
+			
+			private void InsertToFrame(byte degree, byte count,
+					IFrameWriter writer) {
+				try {
+					tupleBuilder.reset();
+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,degree);
+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,count);
+
+					if (!outputAppender.append(
+							tupleBuilder.getFieldEndOffsets(),
+							tupleBuilder.getByteArray(), 0,
+							tupleBuilder.getSize())) {
+						FrameUtils.flushFrame(outputBuffer, writer);
+						outputAppender.reset(outputBuffer, true);
+						if (!outputAppender.append(
+								tupleBuilder.getFieldEndOffsets(),
+								tupleBuilder.getByteArray(), 0,
+								tupleBuilder.getSize())) {
+							throw new IllegalStateException(
+									"Failed to copy an record into a frame: the record size is too large.");
+						}
+					}
+				} catch (Exception e) {
+					throw new IllegalStateException(e);
+				}
+			}
+		};
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
new file mode 100644
index 0000000..ce00667
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/util/StatSumAggregateFactory.java
@@ -0,0 +1,122 @@
+package edu.uci.ics.genomix.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+
+	public class DistributeAggregatorDescriptor implements
+			IAggregatorDescriptor {
+
+		@Override
+		public AggregateState createAggregateStates() {
+			// TODO Auto-generated method stub
+			return null;
+		}
+
+		protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+			int tupleOffset = accessor.getTupleStartOffset(tIndex);
+			int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+			int countoffset = tupleOffset + accessor.getFieldSlotsLength()
+					+ fieldStart;
+			byte[] data = accessor.getBuffer().array();
+			return IntegerSerializerDeserializer.getInt(data, countoffset);
+		}
+
+		@Override
+		public void init(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			int count = getCount(accessor, tIndex);
+
+			DataOutput fieldOutput = tupleBuilder.getDataOutput();
+			try {
+				fieldOutput.writeInt(count);
+				tupleBuilder.addFieldEndOffset();
+			} catch (IOException e) {
+				throw new HyracksDataException(
+						"I/O exception when initializing the aggregator.");
+			}
+		}
+
+		@Override
+		public void reset() {
+			// TODO Auto-generated method stub
+
+		}
+
+		@Override
+		public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+				IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+				AggregateState state) throws HyracksDataException {
+			int count = getCount(accessor, tIndex);
+
+			int statetupleOffset = stateAccessor
+					.getTupleStartOffset(stateTupleIndex);
+			int countfieldStart = stateAccessor.getFieldStartOffset(
+					stateTupleIndex, 1);
+			int countoffset = statetupleOffset
+					+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+			byte[] data = stateAccessor.getBuffer().array();
+			count += IntegerSerializerDeserializer.getInt(data, countoffset);
+			IntegerSerializerDeserializer.putInt(count, data, countoffset);
+		}
+
+		@Override
+		public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			int count = getCount(accessor, tIndex);
+			DataOutput fieldOutput = tupleBuilder.getDataOutput();
+			try {
+				fieldOutput.writeInt(count);
+				tupleBuilder.addFieldEndOffset();
+			} catch (IOException e) {
+				throw new HyracksDataException(
+						"I/O exception when writing aggregation to the output buffer.");
+			}
+
+		}
+
+		@Override
+		public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+				IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+				throws HyracksDataException {
+			outputPartialResult(tupleBuilder, accessor, tIndex, state);
+
+		}
+
+		@Override
+		public void close() {
+			// TODO Auto-generated method stub
+
+		}
+
+	}
+
+	@Override
+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+			RecordDescriptor inRecordDescriptor,
+			RecordDescriptor outRecordDescriptor, int[] keyFields,
+			int[] keyFieldsInPartialResults) throws HyracksDataException {
+		// TODO Auto-generated method stub
+		return new DistributeAggregatorDescriptor();
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java
deleted file mode 100644
index f1b2008..0000000
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/HyracksUtils.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.genomix.example.jobrun;
-import java.util.EnumSet;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class HyracksUtils {
-
-	public static final String NC1_ID = "nc1";
-	public static final String NC2_ID = "nc2";
-
-	public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
-	public static final int TEST_HYRACKS_CC_PORT = 1099;
-	public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
-	public static final String CC_HOST = "localhost";
-
-	public static final int FRAME_SIZE = 65536;
-
-	private static ClusterControllerService cc;
-	private static NodeControllerService nc1;
-	private static NodeControllerService nc2;
-	private static IHyracksClientConnection hcc;
-
-	public static void init() throws Exception {
-		CCConfig ccConfig = new CCConfig();
-		ccConfig.clientNetIpAddress = CC_HOST;
-		ccConfig.clusterNetIpAddress = CC_HOST;
-		ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
-		ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
-		ccConfig.defaultMaxJobAttempts = 0;
-		ccConfig.jobHistorySize = 0;
-		ccConfig.profileDumpPeriod = -1;
-
-		// cluster controller
-		cc = new ClusterControllerService(ccConfig);
-		cc.start();
-
-		// two node controllers
-		NCConfig ncConfig1 = new NCConfig();
-		ncConfig1.ccHost = "localhost";
-		ncConfig1.clusterNetIPAddress = "localhost";
-		ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
-		ncConfig1.dataIPAddress = "127.0.0.1";
-		ncConfig1.datasetIPAddress = "127.0.0.1";
-		ncConfig1.nodeId = NC1_ID;
-		nc1 = new NodeControllerService(ncConfig1);
-		nc1.start();
-
-		NCConfig ncConfig2 = new NCConfig();
-		ncConfig2.ccHost = "localhost";
-		ncConfig2.clusterNetIPAddress = "localhost";
-		ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
-		ncConfig2.dataIPAddress = "127.0.0.1";
-		ncConfig2.datasetIPAddress = "127.0.0.1";
-		ncConfig2.nodeId = NC2_ID;
-		nc2 = new NodeControllerService(ncConfig2);
-		nc2.start();
-
-		// hyracks connection
-		hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
-	}
-
-	public static void deinit() throws Exception {
-		nc2.stop();
-		nc1.stop();
-		cc.stop();
-	}
-
-	public static void runJob(JobSpecification spec, String appName)
-			throws Exception {
-		spec.setFrameSize(FRAME_SIZE);
-		JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-		hcc.waitForCompletion(jobId);
-	}
-
-}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index de68f39..ba0e5b9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -30,12 +30,13 @@
 import edu.uci.ics.genomix.job.GenomixJob;
 import edu.uci.ics.genomix.type.Kmer;
 import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.genomix.example.jobrun.TestUtils;;
 
 public class JobRunTest {
 	private static final String ACTUAL_RESULT_DIR = "actual";
 	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-	private static final String DATA_PATH = "src/test/resources/data/mergeTest/ThreeKmer";
+	private static final String DATA_PATH = "src/test/resources/data/webmap/text.txt";
 	private static final String HDFS_INPUT_PATH = "/webmap";
 	private static final String HDFS_OUTPUT_PATH = "/webmap_result";
 
@@ -59,7 +60,7 @@
 	@Before
 	public void setUp() throws Exception {
 		cleanupStores();
-		HyracksUtils.init();
+		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.init();
 		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
 		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
 		startHDFS();
@@ -68,8 +69,8 @@
 		FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
 
 		conf.setInt(GenomixJob.KMER_LENGTH, 5);
-		driver = new Driver(HyracksUtils.CC_HOST,
-				HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+		driver = new Driver(edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.CC_HOST,
+				edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
 				numPartitionPerMachine);
 	}
 
@@ -94,7 +95,7 @@
 		Path dest = new Path(HDFS_INPUT_PATH);
 		Path result = new Path(HDFS_OUTPUT_PATH);
 		dfs.mkdirs(dest);
-		// dfs.mkdirs(result);
+		//dfs.mkdirs(result);
 		dfs.copyFromLocalFile(src, dest);
 
 		DataOutputStream confOutput = new DataOutputStream(
@@ -116,7 +117,7 @@
 	}
 
 	@Test
-	public void TestAll() throws Exception {
+	public void TestAll() throws Exception{
 		cleanUpReEntry();
 		TestExternalGroupby();
 		cleanUpReEntry();
@@ -131,7 +132,7 @@
 		cleanUpReEntry();
 		TestHybridReversedGroupby();
 	}
-
+	
 	public void TestExternalGroupby() throws Exception {
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
 		System.err.println("Testing ExternalGroupBy");
@@ -152,24 +153,22 @@
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults(EXPECTED_PATH));
 	}
-
-	public void TestExternalReversedGroupby() throws Exception {
+	
+	public void TestExternalReversedGroupby() throws Exception{
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
 		conf.setBoolean(GenomixJob.REVERSED_KMER, true);
 		System.err.println("Testing ExternalGroupBy + Reversed");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
 	}
-
-	public void TestPreClusterReversedGroupby() throws Exception {
+	public void TestPreClusterReversedGroupby() throws Exception{
 		conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
 		conf.setBoolean(GenomixJob.REVERSED_KMER, true);
 		System.err.println("Testing PreclusterGroupBy + Reversed");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults(EXPECTED_REVERSE_PATH));
 	}
-
-	public void TestHybridReversedGroupby() throws Exception {
+	public void TestHybridReversedGroupby() throws Exception{
 		conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
 		conf.setBoolean(GenomixJob.REVERSED_KMER, true);
 		System.err.println("Testing HybridGroupBy + Reversed");
@@ -187,21 +186,22 @@
 							DUMPED_RESULT), false, conf, null);
 			dumped = new File(DUMPED_RESULT);
 		} else {
-
-			FileSystem.getLocal(new Configuration()).mkdirs(
-					new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH));
+			
+			FileSystem.getLocal(new Configuration()).mkdirs(new Path(ACTUAL_RESULT_DIR
+			+ HDFS_OUTPUT_PATH));
 			File filePathTo = new File(CONVERT_RESULT);
 			BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
 			for (int i = 0; i < numPartitionPerMachine * numberOfNC; i++) {
 				String partname = "/part-" + i;
-				FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
-						+ partname), FileSystem.getLocal(new Configuration()),
-						new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH
-								+ partname), false, conf);
-
-				Path path = new Path(HDFS_OUTPUT_PATH + partname);
+//				FileUtil.copy(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH
+//						+ partname), FileSystem.getLocal(new Configuration()),
+//						new Path(ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + partname), false, conf);
+				
+				
+				Path path = new Path(HDFS_OUTPUT_PATH
+						+ partname);
 				FileSystem dfs = FileSystem.get(conf);
-				if (dfs.getFileStatus(path).getLen() == 0) {
+				if (dfs.getFileStatus(path).getLen() == 0){
 					continue;
 				}
 				SequenceFile.Reader reader = new SequenceFile.Reader(dfs, path,
@@ -213,14 +213,15 @@
 
 				int k = conf.getInt(GenomixJob.KMER_LENGTH, 25);
 				while (reader.next(key, value)) {
-					if (key == null || value == null) {
+					if (key == null || value == null){
 						break;
 					}
 					bw.write(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
 							key.getLength())
 							+ "\t" + value.toString());
-					System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(),
-							0, key.getLength()) + "\t" + value.toString());
+					System.out.println(Kmer.recoverKmerFrom(k, key.getBytes(), 0,
+							key.getLength())
+							+ "\t" + value.toString());
 					bw.newLine();
 				}
 				reader.close();
@@ -230,13 +231,13 @@
 			dumped = new File(CONVERT_RESULT);
 		}
 
-		TestUtils.compareWithResult(new File(expectedPath), dumped);
+		TestUtils.compareWithSortedResult(new File(expectedPath), dumped);
 		return true;
 	}
 
 	@After
 	public void tearDown() throws Exception {
-		HyracksUtils.deinit();
+		edu.uci.ics.hyracks.hdfs.utils.HyracksUtils.deinit();
 		cleanupHDFS();
 	}
 
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
index 4c36eca..22688e0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/TestUtils.java
@@ -18,8 +18,47 @@
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collections;
 
 public class TestUtils {
+    /**
+     * Compare with the sorted expected file.
+     * The actual file may not be sorted; 
+     * @param expectedFile
+     * @param actualFile
+     */
+    public static void compareWithSortedResult(File expectedFile, File actualFile) throws Exception{
+        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+        ArrayList<String> actualLines = new ArrayList<String>();
+        String lineExpected, lineActual;
+        try{
+                while ( (lineActual = readerActual.readLine())!=null){
+                        actualLines.add(lineActual);
+                }
+                Collections.sort(actualLines);
+                int num = 1;
+                for(String actualLine : actualLines){
+                        lineExpected = readerExpected.readLine();
+                        if (lineExpected == null){
+                                throw new Exception("Actual result changed at line " + num + ":\n< " + actualLine + "\n> ");
+                        }
+                        if ( !equalStrings(lineExpected, actualLine)){
+                                   throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                               + actualLine);
+                        }
+                ++num;
+                }
+                lineExpected = readerExpected.readLine();
+                if (lineExpected != null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineExpected);
+                }
+        } finally{
+                readerActual.close();
+                readerExpected.close();
+        }
+    }
 
     public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
         BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 436e09f..5de9f37 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -1,5 +1,6 @@
 <?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>hyracks-hdfs-core</artifactId>
 	<name>hyracks-hdfs-core</name>
@@ -52,6 +53,18 @@
 					</filesets>
 				</configuration>
 			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.2</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>