fix the issue of same recordoutputDescriptor

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2958 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index f3efd45..dae1b6f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -5,9 +5,8 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 

 /**

- * Aggregation sort: speed up 

- * from hyracks

- *

+ * Aggregation sort: speed up from hyracks

+ * 

  */

 public class Integer64NormalizedKeyComputerFactory implements

 		INormalizedKeyComputerFactory {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 54fb62b..d099fc0 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -29,7 +29,8 @@
 				int slotLength = accessor.getFieldSlotsLength();

 

 				ByteBuffer buf = accessor.getBuffer();

-				long l = BufferSerDeUtils.getLong(buf.array(), startOffset + fieldOffset + slotLength);

+				long l = BufferSerDeUtils.getLong(buf.array(), startOffset

+						+ fieldOffset + slotLength);

 				return (int) (l % nParts);

 			}

 		};

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index ffa6fc6..c49d6ff 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -78,11 +78,11 @@
 	public int compareTo(byte[] bytes, int start, int length) {

 

 		int be = this.start;

-		

-		if(this.bytes[be] != bytes[start]){

+

+		if (this.bytes[be] != bytes[start]) {

 			return (this.bytes[be] < bytes[start]) ? -1 : 1;

 		}

-		

+

 		int n = this.bytes[be];

 		int l = (int) Math.ceil(n / 4);

 		for (int i = l; i > 0; i--) {

@@ -99,19 +99,18 @@
 	public int hash() {// BKDRHash

 		int hash = 1;

 		for (int i = start + 1; i <= start + length; i++)

-			hash = (31 * hash) + (int)bytes[i];

-		if(hash < 0){

+			hash = (31 * hash) + (int) bytes[i];

+		if (hash < 0) {

 			hash = -hash;

 		}

-		//System.err.println(hash);

+		// System.err.println(hash);

 		return hash;

-/*		int seed = 131; // 31 131 1313 13131 131313 etc..

-		int hash = 0;

-		int l = (int) Math.ceil((double) bytes[start] / 4.0);

-		for (int i = start + 1; i <= start + l; i++) {

-			hash = hash * seed + bytes[i];

-		}

-		return (hash & 0x7FFFFFFF);*/

+		/*

+		 * int seed = 131; // 31 131 1313 13131 131313 etc.. int hash = 0; int l

+		 * = (int) Math.ceil((double) bytes[start] / 4.0); for (int i = start +

+		 * 1; i <= start + l; i++) { hash = hash * seed + bytes[i]; } return

+		 * (hash & 0x7FFFFFFF);

+		 */

 	}

 

 	@Override

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index ed114ff..d3de2ba 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -9,7 +9,7 @@
 

 /**

  * used by precluster groupby

- *

+ * 

  */

 public class ConnectorPolicyAssignmentPolicy implements

 		IConnectorPolicyAssignmentPolicy {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 5576fd4..6951eef 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -72,8 +72,8 @@
 			private ArrayTupleBuilder tupleBuilder;

 			private ByteBuffer outputBuffer;

 			private FrameTupleAppender outputAppender;

-			

-			private byte []filter = new byte[4];

+

+			private byte[] filter = new byte[4];

 

 			@SuppressWarnings("resource")

 			@Override

@@ -83,9 +83,9 @@
 				outputBuffer = ctx.allocateFrame();

 				outputAppender = new FrameTupleAppender(ctx.getFrameSize());

 				outputAppender.reset(outputBuffer, true);

-				

+

 				Kmer.initializeFilter(k, filter);

-				

+

 				try {// one try with multiple catch?

 					writer.open();

 					String s = pathSurfix + String.valueOf(temp);

@@ -99,19 +99,19 @@
 								new InputStreamReader(

 										new FileInputStream(fa[i])));

 						String read = readsfile.readLine();

-						//int count  = 0;

+						// int count = 0;

 						while (read != null) {

 							read = readsfile.readLine();

-							//if(count % 4 == 1)

+							// if(count % 4 == 1)

 							SplitReads(read.getBytes());

 							// read.getBytes();

 							read = readsfile.readLine();

-							

+

 							read = readsfile.readLine();

 

 							read = readsfile.readLine();

-							//count += 1;

-							//System.err.println(count);

+							// count += 1;

+							// System.err.println(count);

 						}

 					}

 					if (outputAppender.getTupleCount() > 0) {

@@ -127,7 +127,6 @@
 				}

 			}

 

-

 			private void SplitReads(byte[] array) {

 				try {

 					byte[] bytes = null;

@@ -139,7 +138,7 @@
 						if (0 == i) {

 							bytes = Kmer.CompressKmer(k, array, i);

 						} else {

-							Kmer.MoveKmer(k, bytes, array[i+k-1], filter);

+							Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);

 							/*

 							 * l <<= 2; l &= window; l |= ConvertSymbol(array[i

 							 * + k - 1]);

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 6928f18..1370494 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -1,18 +1,16 @@
 package edu.uci.ics.genomix.dataflow;
 
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 
+import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -23,64 +21,42 @@
 
 	private static final long serialVersionUID = 1L;
 	private ConfFactory confFactory;
-	public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException{
-			this.confFactory = new ConfFactory(conf);
+
+	public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+		this.confFactory = new ConfFactory(conf);
 	}
-	
-	public class KMerCountValue implements ValueBytes{
-		private ITupleReference tuple;
-		public KMerCountValue(ITupleReference tuple) {
-			this.tuple = tuple;
-		}
 
-		@Override
-		public int getSize() {
-			return tuple.getFieldLength(1) + tuple.getFieldLength(2);
-		}
-
-		@Override
-		public void writeCompressedBytes(DataOutputStream arg0)
-				throws IllegalArgumentException, IOException {
-			for(int i=1; i<3; i++){
-				arg0.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-			}
-		}
-
-		@Override
-		public void writeUncompressedBytes(DataOutputStream arg0)
-				throws IOException {
-			for(int i=1; i<3; i++){
-				arg0.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-			}
-		}
-		
-	}
-	
-	public class TupleWriter implements ITupleWriter{
-		public TupleWriter(ConfFactory cf){
+	public class TupleWriter implements ITupleWriter {
+		public TupleWriter(ConfFactory cf) {
 			this.cf = cf;
 		}
+
 		ConfFactory cf;
 		Writer writer = null;
+
 		/**
-		 * assumption is that output never change source! 
+		 * assumption is that output never change source!
 		 */
 		@Override
-		public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+		public void write(DataOutput output, ITupleReference tuple)
+				throws HyracksDataException {
 			try {
-				if (writer == null){
-					writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, BytesWritable.class, BytesWritable.class, CompressionType.NONE, null);
+				if (writer == null) {
+					writer = SequenceFile.createWriter(cf.getConf(),
+							(FSDataOutputStream) output, BytesWritable.class,
+							BytesWritable.class, CompressionType.NONE, null);
 				}
 				byte[] kmer = tuple.getFieldData(0);
 				int keyStart = tuple.getFieldStart(0);
 				int keyLength = tuple.getFieldLength(0);
-				writer.appendRaw(kmer, keyStart, keyLength, new KMerCountValue(tuple));
+				writer.appendRaw(kmer, keyStart, keyLength, new KmerCountValue(
+						tuple));
 			} catch (IOException e) {
 				throw new HyracksDataException(e);
 			}
 		}
 	}
-	
+
 	@Override
 	public ITupleWriter getTupleWriter() {
 		return new TupleWriter(confFactory);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index ebec17e..9f19858 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -3,7 +3,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
 import edu.uci.ics.genomix.type.Kmer;
@@ -19,23 +18,29 @@
 	 */
 	private static final long serialVersionUID = 1L;
 	private final int KMER;
-	public KMerTextWriterFactory(int kmer){
+
+	public KMerTextWriterFactory(int kmer) {
 		KMER = kmer;
 	}
 
-	public class TupleWriter implements ITupleWriter{
+	public class TupleWriter implements ITupleWriter {
 		@Override
-		public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+		public void write(DataOutput output, ITupleReference tuple)
+				throws HyracksDataException {
 			try {
-				Text.writeString(output, Kmer.recoverKmerFrom(KMER,tuple.getFieldData(0),tuple.getFieldStart(0),tuple.getFieldLength(0)));
-				Text.writeString(output,"\t");
-				Text.writeString(output, Kmer.recoverAdjacent(tuple.getFieldData(1)[tuple.getFieldStart(1)]));
+				Text.writeString(output, Kmer.recoverKmerFrom(KMER,
+						tuple.getFieldData(0), tuple.getFieldStart(0),
+						tuple.getFieldLength(0)));
+				Text.writeString(output, "\t");
+				Text.writeString(output, Kmer.recoverAdjacent(tuple
+						.getFieldData(1)[tuple.getFieldStart(1)]));
 				Text.writeString(output, "\n");
 			} catch (IOException e) {
 				throw new HyracksDataException(e);
 			}
 		}
 	}
+
 	@Override
 	public ITupleWriter getTupleWriter() {
 		return new TupleWriter();
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
index b77720a..13ee200 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
@@ -10,7 +10,6 @@
 public class KMerWriterFactory implements ITupleWriterFactory {

 	private static final long serialVersionUID = 1L;

 

-

 	@Override

 	public ITupleWriter getTupleWriter() {

 		return new ITupleWriter() {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 6d0ef74..0dfc69f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -29,19 +29,7 @@
 	public ReadsKeyValueParserFactory(int k) {

 		this.k = k;

 		byteNum = (byte) Math.ceil((double) k / 4.0);

-		filter[0] = (byte) 0xC0;

-		filter[1] = (byte) 0xFC;

-		filter[2] = 0;

-

-		int r = byteNum * 8 - 2 * k;

-		r = 8 - r;

-		for (int i = 0; i < r; i++) {

-			filter[2] <<= 1;

-			filter[2] |= 1;

-		}

-		for(int i = 0; i < r-1 ; i++){

-			filter[3] <<= 1;

-		}

+		Kmer.initializeFilter(k, filter);

 	}

 

 	@Override

@@ -83,13 +71,9 @@
 

 					for (int i = 0; i < array.length - k + 1; i++) {

 						if (0 == i) {

-							bytes = Kmer.CompressKmer(k,array, i);

+							bytes = Kmer.CompressKmer(k, array, i);

 						} else {

-							Kmer.MoveKmer(k,bytes, array[i + k - 1], filter);

-							/*

-							 * l <<= 2; l &= window; l |= ConvertSymbol(array[i

-							 * + k - 1]);

-							 */

+							Kmer.MoveKmer(k, bytes, array[i + k - 1], filter);

 							pre = Kmer.GENE_CODE.getAdjBit(array[i - 1]);

 						}

 						if (i + k != array.length) {

@@ -102,17 +86,10 @@
 						r |= next;

 

 						tupleBuilder.reset();

-

-						// tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

-						// l);

-						tupleBuilder.addField(bytes, 0, byteNum + 1); 

+						tupleBuilder.addField(bytes, 0, byteNum + 1);

 						tupleBuilder.addField(

 								ByteSerializerDeserializer.INSTANCE, r);

 

-						// int[] a = tupleBuilder.getFieldEndOffsets();

-						// int b = tupleBuilder.getSize();

-						// byte[] c = tupleBuilder.getByteArray();

-

 						if (!outputAppender.append(

 								tupleBuilder.getFieldEndOffsets(),

 								tupleBuilder.getByteArray(), 0,

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index d6498e6..f979070 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -46,313 +46,365 @@
 

 public class Tester {

 

-    private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

-    public static final String NC1_ID = "nc1";

-    public static final String NC2_ID = "nc2";

-    public static final String NC3_ID = "nc3";

-    public static final String NC4_ID = "nc4";

+	private static final Logger LOGGER = Logger.getLogger(Tester.class

+			.getName());

+	public static final String NC1_ID = "nc1";

+	public static final String NC2_ID = "nc2";

+	public static final String NC3_ID = "nc3";

+	public static final String NC4_ID = "nc4";

 

-    private static ClusterControllerService cc;

-    private static NodeControllerService nc1;

-    private static NodeControllerService nc2;

-    private static NodeControllerService nc3;

-    private static NodeControllerService nc4;

-    private static IHyracksClientConnection hcc;

+	private static ClusterControllerService cc;

+	private static NodeControllerService nc1;

+	private static NodeControllerService nc2;

+	private static NodeControllerService nc3;

+	private static NodeControllerService nc4;

+	private static IHyracksClientConnection hcc;

 

-    //private static final boolean DEBUG = true;

+	// private static final boolean DEBUG = true;

 

-    public static void main(String[] args) throws Exception {

+	public static void main(String[] args) throws Exception {

 

-        LOGGER.setLevel(Level.OFF);

+		LOGGER.setLevel(Level.OFF);

 

-        init();

+		init();

 

-        // Options options = new Options();

+		// Options options = new Options();

 

-        IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+		IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

 

-        /*

-         * JobSpecification job =

-         * createJob(parseFileSplits(options.inFileCustomerSplits),

-         * parseFileSplits(options.inFileOrderSplits),

-         * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

-         * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

-         * options.graceFactor, options.memSize, options.tableSize,

-         * options.hasGroupBy);

-         */

+		/*

+		 * JobSpecification job =

+		 * createJob(parseFileSplits(options.inFileCustomerSplits),

+		 * parseFileSplits(options.inFileOrderSplits),

+		 * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

+		 * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

+		 * options.graceFactor, options.memSize, options.tableSize,

+		 * options.hasGroupBy);

+		 */

 

-        int k, page_num;

-        String file_name = args[0];

-        k = Integer.parseInt(args[1]);

-        page_num = Integer.parseInt(args[2]);

-        int type = Integer.parseInt(args[3]);

+		int k, page_num;

+		String file_name = args[0];

+		k = Integer.parseInt(args[1]);

+		page_num = Integer.parseInt(args[2]);

+		int type = Integer.parseInt(args[3]);

 

-        JobSpecification job = createJob(file_name, k, page_num, type);

+		JobSpecification job = createJob(file_name, k, page_num, type);

 

-        long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob("test", job);

-        hcc.waitForCompletion(jobId);

-        long end = System.currentTimeMillis();

-        System.err.println(start + " " + end + " " + (end - start));

-        

-    /*   

+		long start = System.currentTimeMillis();

+		JobId jobId = hcc.startJob("test", job);

+		hcc.waitForCompletion(jobId);

+		long end = System.currentTimeMillis();

+		System.err.println(start + " " + end + " " + (end - start));

 

-        String s = "g:\\data\\results.txt" ;

+		/*

+		 * 

+		 * String s = "g:\\data\\results.txt" ;

+		 * 

+		 * filenames = new FileOutputStream(s); // filenames = new

+		 * FileInputStream("filename.txt");

+		 * 

+		 * BufferedWriter writer = new BufferedWriter(new

+		 * OutputStreamWriter(filenames)); writer.write((int) (end-start));

+		 * writer.close();

+		 */

 

-        filenames = new FileOutputStream(s);

-        // filenames = new FileInputStream("filename.txt");

+	}

 

-        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));

-        writer.write((int) (end-start));

-        writer.close();*/

-        

-    }

+	public static void init() throws Exception {

+		CCConfig ccConfig = new CCConfig();

+		ccConfig.clientNetIpAddress = "127.0.0.1";

+		ccConfig.clientNetPort = 39000;

+		ccConfig.clusterNetIpAddress = "127.0.0.1";

+		ccConfig.clusterNetPort = 39001;

+		ccConfig.profileDumpPeriod = -1;

+		File outDir = new File("target/ClusterController");

+		outDir.mkdirs();

+		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

+				outDir);

+		ccRoot.delete();

+		ccRoot.mkdir();

+		ccConfig.ccRoot = ccRoot.getAbsolutePath();

+		cc = new ClusterControllerService(ccConfig);

+		cc.start();

+		ccConfig.defaultMaxJobAttempts = 0;

 

-    public static void init() throws Exception {

-        CCConfig ccConfig = new CCConfig();

-        ccConfig.clientNetIpAddress = "127.0.0.1";

-        ccConfig.clientNetPort = 39000;

-        ccConfig.clusterNetIpAddress = "127.0.0.1";

-        ccConfig.clusterNetPort = 39001;

-        ccConfig.profileDumpPeriod = -1;

-        File outDir = new File("target/ClusterController");

-        outDir.mkdirs();

-        File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);

-        ccRoot.delete();

-        ccRoot.mkdir();

-        ccConfig.ccRoot = ccRoot.getAbsolutePath();

-        cc = new ClusterControllerService(ccConfig);

-        cc.start();

-        ccConfig.defaultMaxJobAttempts = 0;

+		NCConfig ncConfig1 = new NCConfig();

+		ncConfig1.ccHost = "localhost";

+		ncConfig1.ccPort = 39001;

+		ncConfig1.clusterNetIPAddress = "127.0.0.1";

+		ncConfig1.dataIPAddress = "127.0.0.1";

+		ncConfig1.nodeId = NC1_ID;

+		nc1 = new NodeControllerService(ncConfig1);

+		nc1.start();

 

-        NCConfig ncConfig1 = new NCConfig();

-        ncConfig1.ccHost = "localhost";

-        ncConfig1.ccPort = 39001;

-        ncConfig1.clusterNetIPAddress = "127.0.0.1";

-        ncConfig1.dataIPAddress = "127.0.0.1";

-        ncConfig1.nodeId = NC1_ID;

-        nc1 = new NodeControllerService(ncConfig1);

-        nc1.start();

+		/*

+		 * NCConfig ncConfig2 = new NCConfig(); ncConfig2.ccHost = "localhost";

+		 * ncConfig2.ccPort = 39001; ncConfig2.clusterNetIPAddress =

+		 * "127.0.0.1"; ncConfig2.dataIPAddress = "127.0.0.1"; ncConfig2.nodeId

+		 * = NC2_ID; nc2 = new NodeControllerService(ncConfig2); nc2.start();

+		 * 

+		 * NCConfig ncConfig3 = new NCConfig(); ncConfig3.ccHost = "localhost";

+		 * ncConfig3.ccPort = 39001; ncConfig3.clusterNetIPAddress =

+		 * "127.0.0.1"; ncConfig3.dataIPAddress = "127.0.0.1"; ncConfig3.nodeId

+		 * = NC3_ID; nc3 = new NodeControllerService(ncConfig3); nc3.start();

+		 * 

+		 * NCConfig ncConfig4 = new NCConfig(); ncConfig4.ccHost = "localhost";

+		 * ncConfig4.ccPort = 39001; ncConfig4.clusterNetIPAddress =

+		 * "127.0.0.1"; ncConfig4.dataIPAddress = "127.0.0.1"; ncConfig4.nodeId

+		 * = NC4_ID; nc4 = new NodeControllerService(ncConfig4); nc4.start();

+		 */

 

-  /*      NCConfig ncConfig2 = new NCConfig();

-        ncConfig2.ccHost = "localhost";

-        ncConfig2.ccPort = 39001;

-        ncConfig2.clusterNetIPAddress = "127.0.0.1";

-        ncConfig2.dataIPAddress = "127.0.0.1";

-        ncConfig2.nodeId = NC2_ID;

-        nc2 = new NodeControllerService(ncConfig2);

-        nc2.start();

-        

-        NCConfig ncConfig3 = new NCConfig();

-        ncConfig3.ccHost = "localhost";

-        ncConfig3.ccPort = 39001;

-        ncConfig3.clusterNetIPAddress = "127.0.0.1";

-        ncConfig3.dataIPAddress = "127.0.0.1";

-        ncConfig3.nodeId = NC3_ID;

-        nc3 = new NodeControllerService(ncConfig3);

-        nc3.start();

-        

-        NCConfig ncConfig4 = new NCConfig();

-        ncConfig4.ccHost = "localhost";

-        ncConfig4.ccPort = 39001;

-        ncConfig4.clusterNetIPAddress = "127.0.0.1";

-        ncConfig4.dataIPAddress = "127.0.0.1";

-        ncConfig4.nodeId = NC4_ID;

-        nc4 = new NodeControllerService(ncConfig4);

-        nc4.start();*/

+		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

+				ccConfig.clientNetPort);

+		hcc.createApplication("test", null);

+		if (LOGGER.isLoggable(Level.INFO)) {

+			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+		}

+	}

 

-        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

-        hcc.createApplication("test", null);

-        if (LOGGER.isLoggable(Level.INFO)) {

-            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

-        }

-    }

+	private static JobSpecification createJob(String filename, int k,

+			int page_num, int type) throws HyracksDataException {

+		JobSpecification spec = new JobSpecification();

 

-    private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

-        JobSpecification spec = new JobSpecification();

+		// spec.setFrameSize(32768);

+		spec.setFrameSize(32768);

 

-        //spec.setFrameSize(32768);

-        spec.setFrameSize(32768);

+		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+		// NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+				NC1_ID);

 

-        FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

+		RecordDescriptor outputRec = new RecordDescriptor(

+				new ISerializerDeserializer[] { null,

+						ByteSerializerDeserializer.INSTANCE,

+						ByteSerializerDeserializer.INSTANCE });

+		// Integer64SerializerDeserializer.INSTANCE,

+		// ByteSerializerDeserializer.INSTANCE,

+		// ByteSerializerDeserializer.INSTANCE });

 

-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});

-                //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                //ByteSerializerDeserializer.INSTANCE });

+		int[] keyFields = new int[] { 0 };

+		int frameLimits = 4096; // hyracks oriented

+		int tableSize = 10485767; // hyracks oriented

 

-       int[] keyFields = new int[] { 0 };

-        int frameLimits = 4096;		// hyracks oriented

-        int tableSize = 10485767;	// hyracks oriented

+		AbstractOperatorDescriptor single_grouper;

+		IConnectorDescriptor conn_partition;

+		AbstractOperatorDescriptor cross_grouper;

 

-        AbstractOperatorDescriptor single_grouper;

-        IConnectorDescriptor conn_partition;

-        AbstractOperatorDescriptor cross_grouper;

+		if (0 == type) {// external group by

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-        

-        if(0 == type){//external group by

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-        

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			cross_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-        }

-        else if( 1 == type){

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-            conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

-                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );

-            cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },  

-                    new DistributedMergeLmerAggregateFactory(), 

-                    outputRec);

-        }

-        else{

-            long inputSizeInRawRecords = 154000000;

-            long inputSizeInUniqueKeys = 38500000;

-            int recordSizeInBytes = 4;

-            int hashfuncStartLevel = 1;

-            single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

-                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-                    hashfuncStartLevel, 

-                    new VLongNormalizedKeyComputerFactory(),

-                    new MergeKmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            recordSizeInBytes = 13;

-            cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

-                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-                    hashfuncStartLevel, 

-                    new VLongNormalizedKeyComputerFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);            

-        }

-        

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        

-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(readfileConn, scan, 0, single_grouper, 0);

-        

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+		} else if (1 == type) {

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

+					spec,

+					new KmerHashPartitioncomputerFactory(),

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) });

+			cross_grouper = new PreclusteredGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new DistributedMergeLmerAggregateFactory(), outputRec);

+		} else {

+			long inputSizeInRawRecords = 154000000;

+			long inputSizeInUniqueKeys = 38500000;

+			int recordSizeInBytes = 4;

+			int hashfuncStartLevel = 1;

+			single_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			recordSizeInBytes = 13;

+			cross_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+		}

 

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				single_grouper, NC1_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

 

-        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

+				spec);

+		spec.connect(readfileConn, scan, 0, single_grouper, 0);

 

-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(printConn, cross_grouper, 0, printer, 0);

-        //spec.connect(readfileConn, scan, 0, printer, 0);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				cross_grouper, NC1_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

-        spec.addRoot(printer);

+		// PrinterOperatorDescriptor printer = new

+		// PrinterOperatorDescriptor(spec);

+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,

+				"G:\\data\\result");

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

+				NC1_ID);

 

-        if( 1 == type ){

-            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

-        }

-        // System.out.println(spec.toString());

-        return spec;

-    }

+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+		spec.connect(printConn, cross_grouper, 0, printer, 0);

+		// spec.connect(readfileConn, scan, 0, printer, 0);

 

+		spec.addRoot(printer);

 

-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {

-        private static final long serialVersionUID = 1L;

+		if (1 == type) {

+			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+		}

+		// System.out.println(spec.toString());

+		return spec;

+	}

 

-        private final IBinaryComparatorFactory bFactory;

-        private final int pos0;

-        private final int pos1;

+	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+		private static final long serialVersionUID = 1L;

 

-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {

-            this.bFactory = bFactory;

-            this.pos0 = pos0;

-            this.pos1 = pos1;

-        }

+		private final IBinaryComparatorFactory bFactory;

+		private final int pos0;

+		private final int pos1;

 

-        @Override

-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {

-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);

-        }

-    }

+		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

+				int pos0, int pos1) {

+			this.bFactory = bFactory;

+			this.pos0 = pos0;

+			this.pos1 = pos1;

+		}

 

-    static class JoinComparator implements ITuplePairComparator {

+		@Override

+		public ITuplePairComparator createTuplePairComparator(

+				IHyracksTaskContext ctx) {

+			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

+					pos1);

+		}

+	}

 

-        private final IBinaryComparator bComparator;

-        private final int field0;

-        private final int field1;

+	static class JoinComparator implements ITuplePairComparator {

 

-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {

-            this.bComparator = bComparator;

-            this.field0 = field0;

-            this.field1 = field1;

-        }

+		private final IBinaryComparator bComparator;

+		private final int field0;

+		private final int field1;

 

-        @Override

-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {

-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);

-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+		public JoinComparator(IBinaryComparator bComparator, int field0,

+				int field1) {

+			this.bComparator = bComparator;

+			this.field0 = field0;

+			this.field1 = field1;

+		}

 

-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);

-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+		@Override

+		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

+				IFrameTupleAccessor accessor1, int tIndex1) {

+			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

 

-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

-            int fLen0 = fEnd0 - fStart0;

+			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

 

-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

-            int fLen1 = fEnd1 - fStart1;

+			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+			int fLen0 = fEnd0 - fStart0;

 

-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1

-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);

-            if (c != 0) {

-                return c;

-            }

-            return 0;

-        }

-    }

+			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+			int fLen1 = fEnd1 - fStart1;

+

+			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

+					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

+					fStart1 + fStartOffset1, fLen1);

+			if (c != 0) {

+				return c;

+			}

+			return 0;

+		}

+	}

 

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index d7ab7f7..ba61d18 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -16,7 +16,7 @@
 

 /**

  * sum

- *

+ * 

  */

 public class DistributedMergeLmerAggregateFactory implements

 		IAggregatorDescriptorFactory {

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 76b04d1..427ad4f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -16,7 +16,7 @@
 

 /**

  * count

- *

+ * 

  */

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

 	private static final long serialVersionUID = 1L;

@@ -100,7 +100,7 @@
 						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

 

 				count += 1;

-				if(count > max){

+				if (count > max) {

 					count = max;

 				}

 

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index b2ffb41..12c72d8 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -7,7 +7,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -129,18 +128,16 @@
 
 	public static void main(String[] args) throws Exception {
 		GenomixJob job = new GenomixJob();
-		String[] otherArgs = new GenericOptionsParser(job,
-				args).getRemainingArgs();
+		String[] otherArgs = new GenericOptionsParser(job, args)
+				.getRemainingArgs();
 		if (otherArgs.length < 4) {
 			System.err.println("Need <serverIP> <port> <input> <output>");
 			System.exit(-1);
 		}
 		String ipAddress = otherArgs[0];
 		int port = Integer.parseInt(otherArgs[1]);
-		int numOfDuplicate = job.getInt(
-				CPARTITION_PER_MACHINE, 2);
-		boolean bProfiling = job.getBoolean(IS_PROFILING,
-				true);
+		int numOfDuplicate = job.getInt(CPARTITION_PER_MACHINE, 2);
+		boolean bProfiling = job.getBoolean(IS_PROFILING, true);
 		FileInputFormat.setInputPaths(new Job(job), otherArgs[2]);
 		FileOutputFormat.setOutputPath(new Job(job), new Path(otherArgs[3]));
 		Driver driver = new Driver(ipAddress, port, numOfDuplicate);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index e065b8e..55912c6 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -3,7 +3,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
 
 public class GenomixJob extends Configuration {
 
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index b7e703e..a2e8860 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -15,12 +15,9 @@
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
 import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
-import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
 import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.genomix.job.JobGen;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -34,7 +31,6 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -54,8 +50,9 @@
 	public enum GroupbyType {
 		EXTERNAL, PRECLUSTER, HYBRIDHASH,
 	}
-	public enum OutputFormat{
-		TEXT,BINARY,
+
+	public enum OutputFormat {
+		TEXT, BINARY,
 	}
 
 	JobConf job;
@@ -73,7 +70,8 @@
 	private AbstractOperatorDescriptor singleGrouper;
 	private IConnectorDescriptor connPartition;
 	private AbstractOperatorDescriptor crossGrouper;
-	private RecordDescriptor outputRec;
+	private RecordDescriptor readOutputRec;
+	private RecordDescriptor combineOutputRec;
 
 	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
 			final Map<String, NodeControllerInfo> ncMap,
@@ -103,7 +101,7 @@
 				new VLongNormalizedKeyComputerFactory(),
 				aggeragater,
 				new DistributedMergeLmerAggregateFactory(),
-				outputRec,
+				combineOutputRec,
 				new HashSpillableTableFactory(
 						new FieldHashPartitionComputerFactory(
 								keyFields,
@@ -128,10 +126,10 @@
 				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
 						.of(VLongPointable.FACTORY) },
 				new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
-				hashfuncStartLevel,
-				new VLongNormalizedKeyComputerFactory(),
+				hashfuncStartLevel, new VLongNormalizedKeyComputerFactory(),
 				new MergeKmerAggregateFactory(),
-				new DistributedMergeLmerAggregateFactory(), outputRec, true);
+				new DistributedMergeLmerAggregateFactory(), combineOutputRec,
+				true);
 	}
 
 	private void generateDescriptorbyType(JobSpecification jobSpec)
@@ -161,7 +159,8 @@
 					keyFields,
 					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
 							.of(VLongPointable.FACTORY) },
-					new DistributedMergeLmerAggregateFactory(), outputRec);
+					new DistributedMergeLmerAggregateFactory(),
+					combineOutputRec);
 			break;
 		case HYBRIDHASH:
 		default:
@@ -192,14 +191,13 @@
 	public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
 			throws HyracksDataException {
 		try {
-			
-			InputSplit[] splits = job.getInputFormat().getSplits(
-					job, ncNodeNames.length);
+
+			InputSplit[] splits = job.getInputFormat().getSplits(job,
+					ncNodeNames.length);
 
 			String[] readSchedule = scheduler.getLocationConstraints(splits);
-			return new HDFSReadOperatorDescriptor(jobSpec, outputRec,
-					job, splits, readSchedule,
-					new ReadsKeyValueParserFactory(kmers));
+			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
+					splits, readSchedule, new ReadsKeyValueParserFactory(kmers));
 		} catch (Exception e) {
 			throw new HyracksDataException(e);
 		}
@@ -209,9 +207,12 @@
 	public JobSpecification generateJob() throws HyracksException {
 
 		JobSpecification jobSpec = new JobSpecification();
-		outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				null, ByteSerializerDeserializer.INSTANCE });
+		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
 				null, ByteSerializerDeserializer.INSTANCE,
 				ByteSerializerDeserializer.INSTANCE });
+
 		// File input
 		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
 
@@ -232,11 +233,12 @@
 
 		// Output
 		ITupleWriterFactory writer = null;
-		switch (outputFormat){
+		switch (outputFormat) {
 		case TEXT:
 			writer = new KMerTextWriterFactory(kmers);
 			break;
-		case BINARY: default:
+		case BINARY:
+		default:
 			writer = new KMerSequenceWriterFactory(job);
 			break;
 		}
@@ -248,9 +250,7 @@
 
 		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
 				jobSpec);
-//		jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
-		jobSpec.connect(printConn, readOperator, 0, writeOperator, 0);
-//		jobSpec.addRoot(readOperator);
+		jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
 		jobSpec.addRoot(writeOperator);
 
 		if (groupbyType == GroupbyType.PRECLUSTER) {
@@ -261,7 +261,7 @@
 
 	@Override
 	protected void initJobConfiguration() {
-		
+
 		kmers = conf.getInt(GenomixJob.KMER_LENGTH, 25);
 		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, 4096);
 		tableSize = conf.getInt(GenomixJob.TABLE_SIZE, 10485767);
@@ -274,11 +274,11 @@
 		} else {
 			groupbyType = GroupbyType.HYBRIDHASH;
 		}
-		
+
 		String output = conf.get(GenomixJob.OUTPUT_FORMAT, "binary");
-		if (output.equalsIgnoreCase("binary")){
+		if (output.equalsIgnoreCase("binary")) {
 			outputFormat = OutputFormat.BINARY;
-		} else if ( output.equalsIgnoreCase("text")){
+		} else if (output.equalsIgnoreCase("text")) {
 			outputFormat = OutputFormat.TEXT;
 		} else {
 			outputFormat = OutputFormat.TEXT;
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
index 6570509..0281ebc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/Kmer.java
@@ -6,122 +6,129 @@
 
 import org.apache.hadoop.io.Writable;
 
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+public class Kmer {
 
-public class Kmer implements Writable {
-	
-	public final static byte[] GENE_SYMBOL = {'A','C','G','T'};
-	public final static class GENE_CODE{
-		
-		public static final byte A=0;
-		public static final byte C=1;
-		public static final byte G=2;
-		public static final byte T=3;
-		
-		public static byte getCodeFromSymbol(byte ch){
+	public final static byte[] GENE_SYMBOL = { 'A', 'C', 'G', 'T' };
+
+	public final static class GENE_CODE {
+
+		/**
+		 * make sure this 4 ids equal to the sequence id of char in {@GENE_SYMBOL}
+		 */
+		public static final byte A = 0;
+		public static final byte C = 1;
+		public static final byte G = 2;
+		public static final byte T = 3;
+
+		public static byte getCodeFromSymbol(byte ch) {
 			byte r = 0;
 			switch (ch) {
-			case 'A':case 'a':
+			case 'A':
+			case 'a':
 				r = A;
 				break;
-			case 'C':case 'c':
+			case 'C':
+			case 'c':
 				r = C;
 				break;
-			case 'G':case 'g':
+			case 'G':
+			case 'g':
 				r = G;
 				break;
-			case 'T':case 't':
+			case 'T':
+			case 't':
 				r = T;
 				break;
 			}
 			return r;
 		}
-		
-		public static byte getSymbolFromCode(byte code){
-			if (code > 3){
+
+		public static byte getSymbolFromCode(byte code) {
+			if (code > 3) {
 				return '!';
 			}
 			return GENE_SYMBOL[code];
 		}
-		
+
 		public static byte getAdjBit(byte t) {
 			byte r = 0;
 			switch (t) {
-			case 'A':case 'a':
+			case 'A':
+			case 'a':
 				r = 1 << A;
 				break;
-			case 'C':case 'c':
+			case 'C':
+			case 'c':
 				r = 1 << C;
 				break;
-			case 'G':case 'g':
+			case 'G':
+			case 'g':
 				r = 1 << G;
 				break;
-			case 'T':case 't':
+			case 'T':
+			case 't':
 				r = 1 << T;
 				break;
 			}
 			return r;
 		}
-	}
-
-	public static final byte LOWBITMASK = 0x03;
-	
-	public static String recoverKmerFrom(int k, byte [] keyData, int keyStart, int keyLength ) {
-		StringBuilder sblder = new StringBuilder();
-
-		int outKmer = 0;
-		for ( int i = keyLength-1; i>=0; i--){
-			byte last = keyData[keyStart + i];
-			for( int j = 0; j < 8; j+=2){
-				byte kmer = (byte) ((last >>j) & LOWBITMASK);
-				sblder.append((char)GENE_CODE.getSymbolFromCode(kmer));
-				if ( ++outKmer > k){
-					break;
+		
+		public static String getSymbolFromBitMap(byte code) {
+			int left = (code >> 4) & 0x0F;
+			int right = code & 0x0F;
+			String str = new String();
+			for(int i = A; i <= T ; i++){
+				if ( (left & (1<<i)) != 0){
+					str += GENE_SYMBOL[i];
 				}
 			}
-			if(outKmer >k){
-				break;
+			str += '|';
+			for(int i = A; i <= T ; i++){
+				if ( (right & (1<<i)) != 0){
+					str += GENE_SYMBOL[i];
+				}
 			}
+			return str;
 		}
-		return sblder.toString();
 	}
-	
-	public static String recoverAdjacent(byte number){
+
+	public static String recoverKmerFrom(int k, byte[] keyData, int keyStart,
+			int keyLength) {
+		byte kmer = keyData[keyStart];
+
+		String sblder = String.valueOf((int) kmer) + " ";
+		for (int i = keyStart + 1; i < keyStart + keyLength; i++) {
+			byte genecode = keyData[i];
+			sblder += String.valueOf((int) genecode) + " ";
+		}
+		return sblder;
+	}
+
+	public static String recoverAdjacent(byte number) {
 		int incoming = (number & 0xF0) >> 4;
 		int outgoing = number & 0x0F;
 		return String.valueOf(incoming) + '|' + String.valueOf(outgoing);
 	}
-	
-	@Override
-	public void readFields(DataInput arg0) throws IOException {
-		// TODO Auto-generated method stub
 
-	}
 
-	@Override
-	public void write(DataOutput arg0) throws IOException {
-		// TODO Auto-generated method stub
-
-	}
-	
-	public static void initializeFilter(int k, byte []filter){
+	public static void initializeFilter(int k, byte[] filter) {
 		filter[0] = (byte) 0xC0;
 		filter[1] = (byte) 0xFC;
 		filter[2] = 0;
 		filter[3] = 3;
 		final int byteNum = (byte) Math.ceil((double) k / 4.0);
-		
+
 		int r = byteNum * 8 - 2 * k;
 		r = 8 - r;
 		for (int i = 0; i < r; i++) {
 			filter[2] <<= 1;
 			filter[2] |= 1;
 		}
-		for(int i = 0; i < r-1 ; i++){
+		for (int i = 0; i < r - 1; i++) {
 			filter[3] <<= 1;
 		}
 	}
-	
+
 	public static byte[] CompressKmer(int k, byte[] array, int start) {
 		final int byteNum = (byte) Math.ceil((double) k / 4.0);
 		byte[] bytes = new byte[byteNum + 1];
@@ -131,26 +138,27 @@
 		int count = 0;
 		int bcount = 0;
 
-		for (int i = start; i < start+k ; i++) {
-			l = (byte) ((l<<2) & 0xFC);
+		for (int i = start; i < start + k; i++) {
+			l = (byte) ((l << 2) & 0xFC);
 			l |= GENE_CODE.getCodeFromSymbol(array[i]);
 			count += 2;
 			if (count % 8 == 0 && byteNum - bcount > 1) {
-				bytes[byteNum-bcount] = l;
+				bytes[byteNum - bcount] = l;
 				bcount += 1;
 				count = 0;
 				l = 0;
 			}
-			if (byteNum - bcount <= 1){
+			if (byteNum - bcount <= 1) {
 				break;
 			}
 		}
 		bytes[1] = l;
 		return bytes;
 	}
-	
+
 	public static void MoveKmer(int k, byte[] bytes, byte c, byte filter[]) {
-		int i = (byte) Math.ceil((double) k / 4.0);;
+		int i = (byte) Math.ceil((double) k / 4.0);
+		;
 		bytes[i] <<= 2;
 		bytes[i] &= filter[1];
 		i -= 1;
@@ -163,11 +171,10 @@
 			bytes[i] &= filter[1];
 			i -= 1;
 		}
-		bytes[2] |= (byte) (bytes[1]&filter[3]);
-		bytes[1] <<=2;
+		bytes[2] |= (byte) (bytes[1] & filter[3]);
+		bytes[1] <<= 2;
 		bytes[1] &= filter[2];
 		bytes[1] |= GENE_CODE.getCodeFromSymbol(c);
 	}
 
-
 }
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
new file mode 100644
index 0000000..e943798
--- /dev/null
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/type/KmerCountValue.java
@@ -0,0 +1,58 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class KmerCountValue implements ValueBytes, Writable{
+	private byte adjBitMap;
+	private byte count;
+
+	public KmerCountValue(ITupleReference tuple) {
+		adjBitMap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+		count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+	}
+
+	@Override
+	public int getSize() {
+		return 2;
+	}
+
+	@Override
+	public void writeCompressedBytes(DataOutputStream arg0)
+			throws IllegalArgumentException, IOException {
+		arg0.writeByte(adjBitMap);
+		arg0.writeByte(count);
+	}
+
+	@Override
+	public void writeUncompressedBytes(DataOutputStream arg0)
+			throws IOException {
+		arg0.writeByte(adjBitMap);
+		arg0.writeByte(count);
+	}
+
+	@Override
+	public void readFields(DataInput arg0) throws IOException {
+		adjBitMap = arg0.readByte();
+		count = arg0.readByte();
+	}
+
+	@Override
+	public void write(DataOutput arg0) throws IOException {
+		arg0.writeByte(adjBitMap);
+		arg0.writeByte(count);
+	}
+
+	@Override
+	public String toString() {
+		return Kmer.GENE_CODE.getSymbolFromBitMap(adjBitMap) + '\t' + String.valueOf(count);
+	}
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index a6ff826..788ebca 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -1,25 +1,26 @@
 package edu.uci.ics.genomix.example.jobrun;
 
+import java.io.BufferedWriter;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,30 +28,9 @@
 import edu.uci.ics.genomix.driver.Driver;
 import edu.uci.ics.genomix.driver.Driver.Plan;
 import edu.uci.ics.genomix.job.GenomixJob;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
-import edu.uci.ics.hyracks.hdfs.lib.TextTupleWriterFactory;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+import edu.uci.ics.genomix.type.KmerCountValue;
 import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
 import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
-import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 
 public class JobRunTestCase {
 	private static final String ACTUAL_RESULT_DIR = "actual";
@@ -60,8 +40,10 @@
 	private static final String HDFS_INPUT_PATH = "/webmap";
 	private static final String HDFS_OUTPUT_PATH = "/webmap_result/";
 
-    private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
-    private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
+	private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR
+			+ HDFS_OUTPUT_PATH + "/merged.txt";
+	private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
+	private static final String EXPECTED_PATH = "src/test/resources/expected/result2";
 
 	private static final String HYRACKS_APP_NAME = "genomix";
 	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
@@ -122,14 +104,14 @@
 		confOutput.flush();
 		confOutput.close();
 	}
-	
-	private void cleanUpReEntry() throws IOException{
+
+	private void cleanUpReEntry() throws IOException {
 		FileSystem lfs = FileSystem.getLocal(new Configuration());
-		if (lfs.exists(new Path(DUMPED_RESULT))){
+		if (lfs.exists(new Path(DUMPED_RESULT))) {
 			lfs.delete(new Path(DUMPED_RESULT), true);
 		}
 		FileSystem dfs = FileSystem.get(conf);
-		if (dfs.exists(new Path(HDFS_OUTPUT_PATH))){
+		if (dfs.exists(new Path(HDFS_OUTPUT_PATH))) {
 			dfs.delete(new Path(HDFS_OUTPUT_PATH), true);
 		}
 	}
@@ -138,12 +120,12 @@
 	public void TestExternalGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
 	}
 
-	//@Test
+	// @Test
 	public void TestPreClusterGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
@@ -151,19 +133,36 @@
 		Assert.assertEquals(true, checkResults());
 	}
 
-	//@Test
+	// @Test
 	public void TestHybridGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
 	}
 
 	private boolean checkResults() throws Exception {
-		FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH), FileSystem.getLocal(new Configuration()), new Path(DUMPED_RESULT), false, conf, null);
-		TestUtils.compareWithResult(new File(EXPECTED_PATH
-				), new File(DUMPED_RESULT));
+		FileUtil.copyMerge(FileSystem.get(conf), new Path(HDFS_OUTPUT_PATH),
+				FileSystem.getLocal(new Configuration()), new Path(
+						DUMPED_RESULT), false, conf, null);
+		
+        SequenceFile.Reader reader = null;
+        Path path = new Path(DUMPED_RESULT);
+        FileSystem dfs = FileSystem.get(conf);
+        reader = new SequenceFile.Reader(dfs, path, conf);
+        BytesWritable key = (BytesWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        KmerCountValue value = (KmerCountValue) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        File filePathTo = new File(CONVERT_RESULT);
+        BufferedWriter bw = new BufferedWriter(new FileWriter(filePathTo));
+        while (reader.next(key, value)) {
+            bw.write(key + "\t" + value.toString());
+            bw.newLine();
+        }
+        bw.close();
+        
+		TestUtils.compareWithResult(new File(EXPECTED_PATH), new File(
+				DUMPED_RESULT));
 		return true;
 	}