fix normalized comparator
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3135 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
index b34772d..e7aa481 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/KmerBinaryHashFunctionFamily.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.data.std.accessors;
+import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
@@ -10,11 +11,18 @@
public IBinaryHashFunction createBinaryHashFunction(final int seed) {
return new IBinaryHashFunction() {
-
+ private KmerPointable p = new KmerPointable();
+
@Override
public int hash(byte[] bytes, int offset, int length) {
- return KmerHashPartitioncomputerFactory.hashBytes(bytes,
- offset, length);
+ if (length + offset >= bytes.length)
+ throw new IllegalStateException("out of bound");
+ p.set(bytes, offset, length);
+ int hash = p.hash() * (seed + 1);
+ if (hash < 0) {
+ hash = -(hash+1);
+ }
+ return hash;
}
};
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
index 7864830..675b589 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/KmerPointable.java
@@ -41,15 +41,21 @@
public static short getShortReverse(byte[] bytes, int offset, int length) {
if (length < 2) {
- return (short) (bytes[offset]);
+ return (short) (bytes[offset] & 0xff);
}
return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset
+ length - 2] & 0xff));
}
public static int getIntReverse(byte[] bytes, int offset, int length) {
- if (length < 4) {
- return getShortReverse(bytes, offset, length);
+ int shortValue = getShortReverse(bytes, offset, length);
+
+ if (length < 3) {
+ return shortValue;
+ }
+ if (length == 3) {
+ return (((bytes[offset + 2] & 0xff) << 16)
+ + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));
}
return ((bytes[offset + length - 1] & 0xff) << 24)
+ ((bytes[offset + length - 2] & 0xff) << 16)
@@ -83,8 +89,18 @@
if (this.length != length) {
return this.length - length;
}
-
- for (int i = length - 1; i >= 0; i--) {
+
+ // Why have we write so much ?
+ // We need to follow the normalized key and it's usage
+ int bNormKey = getIntReverse(this.bytes, this.start, this.length);
+ int mNormKey = getIntReverse(bytes, offset, length);
+ int cmp = bNormKey - mNormKey;
+ if ( cmp != 0){
+ return ((((long) bNormKey) & 0xffffffffL) < (((long) mNormKey) & 0xffffffffL)) ? -1
+ : 1;
+ }
+
+ for (int i = length - 5; i >= 0; i--) {
if (this.bytes[this.start + i] < bytes[offset + i]) {
return -1;
} else if (this.bytes[this.start + i] > bytes[offset + i]) {
@@ -96,7 +112,9 @@
@Override
public int hash() {
- return KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);
+ int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start,
+ length);
+ return hash;
}
@Override
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 98a0cd9..04349b0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -1,15 +1,9 @@
package edu.uci.ics.genomix.dataflow.aggregators;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -20,120 +14,24 @@
public class DistributedMergeLmerAggregateFactory implements
IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
- private static final int MAX = 127;
public DistributedMergeLmerAggregateFactory() {
}
+ public class DistributeAggregatorDescriptor extends
+ LocalAggregatorDescriptor {
+ @Override
+ protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
+ return super.getField(accessor, tIndex, 2);
+ }
+ }
+
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields,
int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new IAggregatorDescriptor() {
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return new AggregateState(new Object() {
- });
- }
-
- private byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart
- + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor
- .getBuffer().array(), offset);
- return data;
- }
-
- /**
- * met a new kmer
- */
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
-
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
-
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int statefieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int stateoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + statefieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[stateoffset];
- count += data[stateoffset + 1];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[stateoffset] = bitmap;
- data[stateoffset + 1] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- };
+ return new DistributeAggregatorDescriptor();
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
new file mode 100644
index 0000000..a592cc2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/LocalAggregatorDescriptor.java
@@ -0,0 +1,120 @@
+package edu.uci.ics.genomix.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+
+public class LocalAggregatorDescriptor implements IAggregatorDescriptor {
+ private static final int MAX = 127;
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new Object() {
+ });
+ }
+
+ protected byte getField(IFrameTupleAccessor accessor, int tIndex,
+ int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ byte data = ByteSerializerDeserializer.getByte(accessor.getBuffer()
+ .array(), offset);
+ return data;
+ }
+
+ protected byte getCount(IFrameTupleAccessor accessor, int tIndex) {
+ return 1;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getCount(accessor, tIndex);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ short count = getCount(accessor, tIndex);
+
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int bitfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex,
+ 1);
+ int countfieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 2);
+ int bitoffset = statetupleOffset + stateAccessor.getFieldSlotsLength()
+ + bitfieldStart;
+ int countoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+
+ bitmap |= data[bitoffset];
+ count += data[countoffset];
+ if (count >= MAX) {
+ count = (byte) MAX;
+ }
+ data[bitoffset] = bitmap;
+ data[countoffset] = (byte) count;
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ byte bitmap = getField(accessor, tIndex, 1);
+ byte count = getField(accessor, tIndex, 2);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
+};
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 08ff462..58ff8a2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -1,15 +1,8 @@
package edu.uci.ics.genomix.dataflow.aggregators;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -19,7 +12,6 @@
*/
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
- private static final int MAX = 127;
public MergeKmerAggregateFactory() {
}
@@ -29,108 +21,7 @@
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields,
int[] keyFieldsInPartialResults) throws HyracksDataException {
- return new IAggregatorDescriptor() {
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return new AggregateState(new Object() {
- });
- }
-
- private byte getField(IFrameTupleAccessor accessor, int tIndex,
- int fieldId) {
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
- int offset = tupleOffset + fieldStart
- + accessor.getFieldSlotsLength();
- byte data = ByteSerializerDeserializer.getByte(accessor
- .getBuffer().array(), offset);
- return data;
- }
-
- @Override
- public void init(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = 1;
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when initializing the aggregator.");
- }
-
- }
-
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex,
- IFrameTupleAccessor stateAccessor, int stateTupleIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- short count = 1;
-
- int statetupleOffset = stateAccessor
- .getTupleStartOffset(stateTupleIndex);
- int statefieldStart = stateAccessor.getFieldStartOffset(
- stateTupleIndex, 1);
- int stateoffset = statetupleOffset
- + stateAccessor.getFieldSlotsLength() + statefieldStart;
-
- byte[] data = stateAccessor.getBuffer().array();
-
- bitmap |= data[stateoffset];
- count += data[stateoffset + 1];
- if (count >= MAX) {
- count = (byte) MAX;
- }
- data[stateoffset] = bitmap;
- data[stateoffset + 1] = (byte) count;
- }
-
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = getField(accessor, tIndex, 1);
- byte count = getField(accessor, tIndex, 2);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(
- "I/O exception when writing aggregation to the output buffer.");
- }
-
- }
-
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
- IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- outputPartialResult(tupleBuilder, accessor, tIndex, state);
- }
-
- };
+ return new LocalAggregatorDescriptor();
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index c4a6300..b8e4219 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -8,9 +8,9 @@
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.genomix.data.std.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.accessors.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.std.accessors.KmerNormarlizedComputerFactory;
-import edu.uci.ics.genomix.data.std.accessors.KmerBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.primitive.KmerPointable;
import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
@@ -79,14 +79,14 @@
private int recordSizeInBytes;
private int hashfuncStartLevel;
- private void logDebug(String status){
+ private void logDebug(String status) {
String names = "";
- for (String str : ncNodeNames){
+ for (String str : ncNodeNames) {
names += str + " ";
}
LOG.info(status + " nc nodes:" + ncNodeNames.length + " " + names);
}
-
+
public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
final Map<String, NodeControllerInfo> ncMap,
int numPartitionPerMachine) {
@@ -126,7 +126,8 @@
private HybridHashGroupOperatorDescriptor newHybridGroupby(
JobSpecification jobSpec, int[] keyFields,
long inputSizeInRawRecords, long inputSizeInUniqueKeys,
- int recordSizeInBytes, int hashfuncStartLevel)
+ int recordSizeInBytes, int hashfuncStartLevel,
+ IAggregatorDescriptorFactory aggeragater)
throws HyracksDataException {
return new HybridHashGroupOperatorDescriptor(
jobSpec,
@@ -140,9 +141,8 @@
.of(KmerPointable.FACTORY) },
new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() },
hashfuncStartLevel, new KmerNormarlizedComputerFactory(),
- new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(), combineOutputRec,
- true);
+ aggeragater, new DistributedMergeLmerAggregateFactory(),
+ combineOutputRec, true);
}
private void generateDescriptorbyType(JobSpecification jobSpec)
@@ -177,16 +177,17 @@
break;
case HYBRIDHASH:
default:
-
singleGrouper = newHybridGroupby(jobSpec, keyFields,
inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel);
+ recordSizeInBytes, hashfuncStartLevel,
+ new MergeKmerAggregateFactory());
connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
new KmerHashPartitioncomputerFactory());
crossGrouper = newHybridGroupby(jobSpec, keyFields,
inputSizeInRawRecords, inputSizeInUniqueKeys,
- recordSizeInBytes, hashfuncStartLevel);
+ recordSizeInBytes, hashfuncStartLevel,
+ new DistributedMergeLmerAggregateFactory());
break;
}
}
@@ -201,7 +202,7 @@
LOG.info("HDFS read into " + splits.length + " splits");
String[] readSchedule = scheduler.getLocationConstraints(splits);
String log = "";
- for (String schedule: readSchedule){
+ for (String schedule : readSchedule) {
log += schedule + " ";
}
LOG.info("HDFS read schedule " + log);
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 3e80ab7..4eb7b9f 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -145,7 +145,7 @@
public void TestHybridGroupby() throws Exception {
conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "text");
System.err.println("Testing HybridGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
diff --git a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
index f63a141..f5a05a8 100755
--- a/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
+++ b/genomix/genomix-hyracks/src/test/resources/data/webmap/text.txt
@@ -1,4 +1,8 @@
@625E1AAXX100810:1:100:10000:10271/1
AATAGAAG
+AATAGAAG
+
EDBDB?BEEEDGGEGGGDGGGA>DG@GGD;GD@DG@F?<B<BFFD?
+AATAGAAG
+AATAGAAG
+AATAGAAG
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result2 b/genomix/genomix-hyracks/src/test/resources/expected/result2
index 5e76458..9296453 100755
--- a/genomix/genomix-hyracks/src/test/resources/expected/result2
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result2
@@ -1,4 +1,4 @@
-AATAG |A 1
-AGAAG T| 1
-ATAGA A|A 1
-TAGAA A|G 1
+AATAG |A 5
+AGAAG T| 5
+ATAGA A|A 5
+TAGAA A|G 5