fix normalized comparator

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3135 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index b34772d..e7aa481 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.genomix.data.std.accessors;

 

+import edu.uci.ics.genomix.data.std.primitive.KmerPointable;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

 

@@ -10,11 +11,18 @@
 	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

 

 		return new IBinaryHashFunction() {

-

+			private KmerPointable p = new KmerPointable();

+			

 			@Override

 			public int hash(byte[] bytes, int offset, int length) {

-				return KmerHashPartitioncomputerFactory.hashBytes(bytes,

-						offset, length);

+				if (length + offset >= bytes.length)

+					throw new IllegalStateException("out of bound");

+				p.set(bytes, offset, length);

+				int hash = p.hash() * (seed + 1);

+				if (hash < 0) {

+					hash = -(hash+1);

+				}

+				return hash;

 			}

 		};

 	}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index 7864830..675b589 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -41,15 +41,21 @@
 

 	public static short getShortReverse(byte[] bytes, int offset, int length) {

 		if (length < 2) {

-			return (short) (bytes[offset]);

+			return (short) (bytes[offset] & 0xff);

 		}

 		return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset

 				+ length - 2] & 0xff));

 	}

 

 	public static int getIntReverse(byte[] bytes, int offset, int length) {

-		if (length < 4) {

-			return getShortReverse(bytes, offset, length);

+		int shortValue = getShortReverse(bytes, offset, length);

+

+		if (length < 3) {

+			return shortValue;

+		}

+		if (length == 3) {

+			return (((bytes[offset + 2] & 0xff) << 16)

+					+ ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));

 		}

 		return ((bytes[offset + length - 1] & 0xff) << 24)

 				+ ((bytes[offset + length - 2] & 0xff) << 16)

@@ -83,8 +89,18 @@
 		if (this.length != length) {

 			return this.length - length;

 		}

-

-		for (int i = length - 1; i >= 0; i--) {

+		

+		// Why have we write so much ? 

+		// We need to follow the normalized key and it's usage 

+		int bNormKey = getIntReverse(this.bytes, this.start, this.length);

+		int mNormKey = getIntReverse(bytes, offset, length);

+		int cmp = bNormKey - mNormKey;

+		if ( cmp != 0){

+			return ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1

+					: 1;

+		}

+		

+		for (int i = length - 5; i >= 0; i--) {

 			if (this.bytes[this.start + i] < bytes[offset + i]) {

 				return -1;

 			} else if (this.bytes[this.start + i] > bytes[offset + i]) {

@@ -96,7 +112,9 @@
 

 	@Override

 	public int hash() {

-		return KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);

+		int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start,

+				length);

+		return hash;

 	}

 

 	@Override

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 98a0cd9..04349b0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,15 +1,9 @@
 package edu.uci.ics.genomix.dataflow.aggregators;

 

-import java.io.DataOutput;

-import java.io.IOException;

-

-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;

 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

@@ -20,120 +14,24 @@
 public class DistributedMergeLmerAggregateFactory implements

 		IAggregatorDescriptorFactory {

 	private static final long serialVersionUID = 1L;

-	private static final int MAX = 127;

 

 	public DistributedMergeLmerAggregateFactory() {

 	}

 

+	public class DistributeAggregatorDescriptor extends

+			LocalAggregatorDescriptor {

+		@Override

+		protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {

+			return super.getField(accessor, tIndex, 2);

+		}

+	}

+

 	@Override

 	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

 			RecordDescriptor inRecordDescriptor,

 			RecordDescriptor outRecordDescriptor, int[] keyFields,

 			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new IAggregatorDescriptor() {

-

-			@Override

-			public void reset() {

-			}

-

-			@Override

-			public void close() {

-				// TODO Auto-generated method stub

-

-			}

-

-			@Override

-			public AggregateState createAggregateStates() {

-				// TODO Auto-generated method stub

-				return new AggregateState(new Object() {

-				});

-			}

-

-			private byte getField(IFrameTupleAccessor accessor, int tIndex,

-					int fieldId) {

-				int tupleOffset = accessor.getTupleStartOffset(tIndex);

-				int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-				int offset = tupleOffset + fieldStart

-						+ accessor.getFieldSlotsLength();

-				byte data = ByteSerializerDeserializer.getByte(accessor

-						.getBuffer().array(), offset);

-				return data;

-			}

-

-			/**

-			 * met a new kmer

-			 */

-			@Override

-			public void init(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-

-				byte bitmap = getField(accessor, tIndex, 1);

-				byte count = getField(accessor, tIndex, 2);

-				DataOutput fieldOutput = tupleBuilder.getDataOutput();

-				try {

-					fieldOutput.writeByte(bitmap);

-					tupleBuilder.addFieldEndOffset();

-					fieldOutput.writeByte(count);

-					tupleBuilder.addFieldEndOffset();

-				} catch (IOException e) {

-					throw new HyracksDataException(

-							"I/O exception when initializing the aggregator.");

-				}

-			}

-

-			@Override

-			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

-					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

-					AggregateState state) throws HyracksDataException {

-				byte bitmap = getField(accessor, tIndex, 1);

-				byte count = getField(accessor, tIndex, 2);

-

-				int statetupleOffset = stateAccessor

-						.getTupleStartOffset(stateTupleIndex);

-				int statefieldStart = stateAccessor.getFieldStartOffset(

-						stateTupleIndex, 1);

-				int stateoffset = statetupleOffset

-						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

-				

-				byte[] data = stateAccessor.getBuffer().array();

-

-				bitmap |= data[stateoffset];

-				count += data[stateoffset + 1];

-				if (count >= MAX) {

-					count = (byte) MAX;

-				}

-				data[stateoffset] = bitmap;

-				data[stateoffset + 1] = (byte) count;

-			}

-

-			@Override

-			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-				byte bitmap = getField(accessor, tIndex, 1);

-				byte count = getField(accessor, tIndex, 2);

-				DataOutput fieldOutput = tupleBuilder.getDataOutput();

-				try {

-					fieldOutput.writeByte(bitmap);

-					tupleBuilder.addFieldEndOffset();

-					fieldOutput.writeByte(count);

-					tupleBuilder.addFieldEndOffset();

-				} catch (IOException e) {

-					throw new HyracksDataException(

-							"I/O exception when writing aggregation to the output buffer.");

-				}

-

-			}

-

-			@Override

-			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-				outputPartialResult(tupleBuilder, accessor, tIndex, state);

-			}

-

-		};

+		return new DistributeAggregatorDescriptor();

 	}

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
new file mode 100644
index 0000000..a592cc2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -0,0 +1,120 @@
+package edu.uci.ics.genomix.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
+	private static final int MAX = 127;
+
+	@Override
+	public void reset() {
+	}
+
+	@Override
+	public void close() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public AggregateState createAggregateStates() {
+		return new AggregateState(new Object() {
+		});
+	}
+
+	protected byte getField(IFrameTupleAccessor accessor, int tIndex,
+			int fieldId) {
+		int tupleOffset = accessor.getTupleStartOffset(tIndex);
+		int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+		int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+		byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
+				.array(), offset);
+		return data;
+	}
+
+	protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
+		return 1;
+	}
+
+	@Override
+	public void init(ArrayTupleBuilder tupleBuilder,
+			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+			throws HyracksDataException {
+		byte bitmap = getField(accessor, tIndex, 1);
+		byte count = getCount(accessor, tIndex);
+
+		DataOutput fieldOutput = tupleBuilder.getDataOutput();
+		try {
+			fieldOutput.writeByte(bitmap);
+			tupleBuilder.addFieldEndOffset();
+			fieldOutput.writeByte(count);
+			tupleBuilder.addFieldEndOffset();
+		} catch (IOException e) {
+			throw new HyracksDataException(
+					"I/O exception when initializing the aggregator.");
+		}
+	}
+
+	@Override
+	public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+			IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+			AggregateState state) throws HyracksDataException {
+		byte bitmap = getField(accessor, tIndex, 1);
+		short count = getCount(accessor, tIndex);
+
+		int statetupleOffset = stateAccessor
+				.getTupleStartOffset(stateTupleIndex);
+		int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
+				1);
+		int countfieldStart = stateAccessor.getFieldStartOffset(
+				stateTupleIndex, 2);
+		int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
+				+ bitfieldStart;
+		int countoffset = statetupleOffset
+				+ stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+		byte[] data = stateAccessor.getBuffer().array();
+
+		bitmap |= data[bitoffset];
+		count += data[countoffset];
+		if (count >= MAX) {
+			count = (byte) MAX;
+		}
+		data[bitoffset] = bitmap;
+		data[countoffset] = (byte) count;
+	}
+
+	@Override
+	public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+			throws HyracksDataException {
+		byte bitmap = getField(accessor, tIndex, 1);
+		byte count = getField(accessor, tIndex, 2);
+		DataOutput fieldOutput = tupleBuilder.getDataOutput();
+		try {
+			fieldOutput.writeByte(bitmap);
+			tupleBuilder.addFieldEndOffset();
+			fieldOutput.writeByte(count);
+			tupleBuilder.addFieldEndOffset();
+		} catch (IOException e) {
+			throw new HyracksDataException(
+					"I/O exception when writing aggregation to the output buffer.");
+		}
+
+	}
+
+	@Override
+	public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+			IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+			throws HyracksDataException {
+		outputPartialResult(tupleBuilder, accessor, tIndex, state);
+	}
+
+};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 08ff462..58ff8a2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,15 +1,8 @@
 package edu.uci.ics.genomix.dataflow.aggregators;

 

-import java.io.DataOutput;

-import java.io.IOException;

-

-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;

-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

@@ -19,7 +12,6 @@
  */

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

 	private static final long serialVersionUID = 1L;

-	private static final int MAX = 127;

 

 	public MergeKmerAggregateFactory() {

 	}

@@ -29,108 +21,7 @@
 			RecordDescriptor inRecordDescriptor,

 			RecordDescriptor outRecordDescriptor, int[] keyFields,

 			int[] keyFieldsInPartialResults) throws HyracksDataException {

-		return new IAggregatorDescriptor() {

-

-			@Override

-			public void reset() {

-			}

-

-			@Override

-			public void close() {

-				// TODO Auto-generated method stub

-

-			}

-

-			@Override

-			public AggregateState createAggregateStates() {

-				// TODO Auto-generated method stub

-				return new AggregateState(new Object() {

-				});

-			}

-

-			private byte getField(IFrameTupleAccessor accessor, int tIndex,

-					int fieldId) {

-				int tupleOffset = accessor.getTupleStartOffset(tIndex);

-				int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

-				int offset = tupleOffset + fieldStart

-						+ accessor.getFieldSlotsLength();

-				byte data = ByteSerializerDeserializer.getByte(accessor

-						.getBuffer().array(), offset);

-				return data;

-			}

-

-			@Override

-			public void init(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-				byte bitmap = getField(accessor, tIndex, 1);

-				byte count = 1;

-

-				DataOutput fieldOutput = tupleBuilder.getDataOutput();

-				try {

-					fieldOutput.writeByte(bitmap);

-					tupleBuilder.addFieldEndOffset();

-					fieldOutput.writeByte(count);

-					tupleBuilder.addFieldEndOffset();

-				} catch (IOException e) {

-					throw new HyracksDataException(

-							"I/O exception when initializing the aggregator.");

-				}

-

-			}

-

-			@Override

-			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

-					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

-					AggregateState state) throws HyracksDataException {

-				byte bitmap = getField(accessor, tIndex, 1);

-				short count = 1;

-

-				int statetupleOffset = stateAccessor

-						.getTupleStartOffset(stateTupleIndex);

-				int statefieldStart = stateAccessor.getFieldStartOffset(

-						stateTupleIndex, 1);

-				int stateoffset = statetupleOffset

-						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

-

-				byte[] data = stateAccessor.getBuffer().array();

-

-				bitmap |= data[stateoffset];

-				count += data[stateoffset + 1];

-				if (count >= MAX) {

-					count = (byte) MAX;

-				}

-				data[stateoffset] = bitmap;

-				data[stateoffset + 1] = (byte) count;

-			}

-

-			@Override

-			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-				byte bitmap = getField(accessor, tIndex, 1);

-				byte count = getField(accessor, tIndex, 2);

-				DataOutput fieldOutput = tupleBuilder.getDataOutput();

-				try {

-					fieldOutput.writeByte(bitmap);

-					tupleBuilder.addFieldEndOffset();

-					fieldOutput.writeByte(count);

-					tupleBuilder.addFieldEndOffset();

-				} catch (IOException e) {

-					throw new HyracksDataException(

-							"I/O exception when writing aggregation to the output buffer.");

-				}

-

-			}

-

-			@Override

-			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

-					IFrameTupleAccessor accessor, int tIndex,

-					AggregateState state) throws HyracksDataException {

-				outputPartialResult(tupleBuilder, accessor, tIndex, state);

-			}

-

-		};

+		return new LocalAggregatorDescriptor();

 	}

 

 }

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index c4a6300..b8e4219 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -8,9 +8,9 @@
 import org.apache.hadoop.mapred.JobConf;
 
 import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
 import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
 import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
 import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
@@ -79,14 +79,14 @@
 	private int recordSizeInBytes;
 	private int hashfuncStartLevel;
 
-	private void logDebug(String status){
+	private void logDebug(String status) {
 		String names = "";
-		for (String str : ncNodeNames){
+		for (String str : ncNodeNames) {
 			names += str + " ";
 		}
 		LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
 	}
-	
+
 	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
 			final Map<String, NodeControllerInfo> ncMap,
 			int numPartitionPerMachine) {
@@ -126,7 +126,8 @@
 	private HybridHashGroupOperatorDescriptor newHybridGroupby(
 			JobSpecification jobSpec, int[] keyFields,
 			long inputSizeInRawRecords, long inputSizeInUniqueKeys,
-			int recordSizeInBytes, int hashfuncStartLevel)
+			int recordSizeInBytes, int hashfuncStartLevel,
+			IAggregatorDescriptorFactory aggeragater)
 			throws HyracksDataException {
 		return new HybridHashGroupOperatorDescriptor(
 				jobSpec,
@@ -140,9 +141,8 @@
 						.of(KmerPointable.FACTORY) },
 				new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
 				hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
-				new MergeKmerAggregateFactory(),
-				new DistributedMergeLmerAggregateFactory(), combineOutputRec,
-				true);
+				aggeragater, new DistributedMergeLmerAggregateFactory(),
+				combineOutputRec, true);
 	}
 
 	private void generateDescriptorbyType(JobSpecification jobSpec)
@@ -177,16 +177,17 @@
 			break;
 		case HYBRIDHASH:
 		default:
-
 			singleGrouper = newHybridGroupby(jobSpec, keyFields,
 					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel);
+					recordSizeInBytes, hashfuncStartLevel,
+					new MergeKmerAggregateFactory());
 			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
 					new KmerHashPartitioncomputerFactory());
 
 			crossGrouper = newHybridGroupby(jobSpec, keyFields,
 					inputSizeInRawRecords, inputSizeInUniqueKeys,
-					recordSizeInBytes, hashfuncStartLevel);
+					recordSizeInBytes, hashfuncStartLevel,
+					new DistributedMergeLmerAggregateFactory());
 			break;
 		}
 	}
@@ -201,7 +202,7 @@
 			LOG.info("HDFS read into " + splits.length + " splits");
 			String[] readSchedule = scheduler.getLocationConstraints(splits);
 			String log = "";
-			for (String schedule: readSchedule){
+			for (String schedule : readSchedule) {
 				log += schedule + " ";
 			}
 			LOG.info("HDFS read schedule " + log);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 3e80ab7..4eb7b9f 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -145,7 +145,7 @@
 
 	public void TestHybridGroupby() throws Exception {
 		conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
 		System.err.println("Testing HybridGroupBy");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index f63a141..f5a05a8 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,4 +1,8 @@
 @625E1AAXX100810:1:100:10000:10271/1

 AATAGAAG

+AATAGAAG

 +

 EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?

+AATAGAAG

+AATAGAAG

+AATAGAAG

diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result2 b/genomix/genomix-hyracks/src/test/resources/expected/result2
index 5e76458..9296453 100755
--- a/genomix/genomix-hyracks/src/test/resources/expected/result2
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result2
@@ -1,4 +1,4 @@
-AATAG	|A	1
-AGAAG	T|	1
-ATAGA	A|A	1
-TAGAA	A|G	1
+AATAG	|A	5
+AGAAG	T|	5
+ATAGA	A|A	5
+TAGAA	A|G	5