fix comparator bug in VLongKmer

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3112 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index dae1b6f..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -1,53 +0,0 @@
-package edu.uci.ics.genomix.data.normalizers;

-

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

-

-/**

- * Aggregation sort: speed up from hyracks

- * 

- */

-public class Integer64NormalizedKeyComputerFactory implements

-		INormalizedKeyComputerFactory {

-	private static final long serialVersionUID = 8735044913496854551L;

-

-	@Override

-	public INormalizedKeyComputer createNormalizedKeyComputer() {

-		return new INormalizedKeyComputer() {

-			private static final int POSTIVE_LONG_MASK = (3 << 30);

-			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

-			private static final int NEGATIVE_LONG_MASK = (0 << 30);

-

-			@Override

-			public int normalize(byte[] bytes, int start, int length) {

-				long value = Integer64SerializerDeserializer.getLong(bytes,

-						start);

-				int highValue = (int) (value >> 32);

-				if (highValue > 0) {

-					/** * larger than Integer.MAX */

-					int highNmk = getKey(highValue);

-					highNmk >>= 2;

-					highNmk |= POSTIVE_LONG_MASK;

-					return highNmk;

-				} else if (highValue == 0) {

-					/** * smaller than Integer.MAX but >=0 */

-					int lowNmk = (int) value;

-					lowNmk >>= 2;

-					lowNmk |= NON_NEGATIVE_INT_MASK;

-					return lowNmk;

-				} else {

-					/** * less than 0: have not optimized for that */

-					int highNmk = getKey(highValue);

-					highNmk >>= 2;

-					highNmk |= NEGATIVE_LONG_MASK;

-					return highNmk;

-				}

-			}

-

-			private int getKey(int value) {

-				return value ^ Integer.MIN_VALUE;

-			}

-		};

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index 969431c..17ca8cb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.genomix.data.normalizers;

 

+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 

@@ -14,25 +15,12 @@
 			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

 			private static final int NEGATIVE_LONG_MASK = (0 << 30);

 

-			private long getLong(byte[] bytes, int offset) {

-				int l = (int) Math.ceil((double) bytes[offset] / 4.0);

-				int n = (l < 8) ? l : 8;

-

-				long r = 0;

-				for (int i = 0; i < n; i++) {

-					r <<= 8;

-					r += (long) (bytes[offset + i + 1] & 0xff);

-				}

-

-				return r;

-			}

-

 			/**

 			 * one kmer

 			 */

 			@Override

 			public int normalize(byte[] bytes, int start, int length) {

-				long value = getLong(bytes, start);

+				long value = KmerHashPartitioncomputerFactory.getLong(bytes, start);

 

 				int highValue = (int) (value >> 32);

 				if (highValue > 0) {

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index d56c7c2..ce60917 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -18,6 +18,14 @@
 		

 		return hash;

 	}

+	

+	public static long getLong(byte[] bytes, int offset) {

+        return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)

+                + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)

+                + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)

+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);

+    }

+

 

 	@Override

 	public ITuplePartitionComputer createPartitioner() {

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
index 6477e14..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
@@ -1,30 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.data.std.api.IHashable;

-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

-

-public class LongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

-	private static final long serialVersionUID = 1L;

-

-	@Override

-	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

-

-		return new IBinaryHashFunction() {

-			private LongPointable p = new LongPointable();

-

-			@Override

-			public int hash(byte[] bytes, int offset, int length) {

-				if (length + offset >= bytes.length)

-					throw new IllegalStateException("out of bound");

-				p.set(bytes, offset, length);

-				int hash = Math.abs(((IHashable) p).hash() * (seed + 1));

-				if (hash < 0) {

-					hash = Math.abs(hash + 1);

-				}

-				return hash;

-			}

-		};

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
index 661559f..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;

-

-public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {

-	public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();

-

-	private static final long serialVersionUID = 1L;

-

-	static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877,

-			3, 29 };

-

-	private LongHashFunctionFamily() {

-	}

-

-	@Override

-	public IBinaryHashFunction createBinaryHashFunction(int seed) {

-		final int coefficient = primeCoefficents[seed % primeCoefficents.length];

-		final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];

-

-		return new IBinaryHashFunction() {

-			@Override

-			public int hash(byte[] bytes, int offset, int length) {

-				int h = 0;

-				int utflen = UTF8StringPointable.getUTFLength(bytes, offset);

-				int sStart = offset + 2;

-				int c = 0;

-

-				while (c < utflen) {

-					char ch = UTF8StringPointable.charAt(bytes, sStart + c);

-					h = (coefficient * h + ch) % r;

-					c += UTF8StringPointable.charSize(bytes, sStart + c);

-				}

-				return h;

-			}

-		};

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
index b1db6f2..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -1,77 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-

-public class MurmurHash3BinaryHashFunctionFamily implements

-		IBinaryHashFunctionFamily {

-

-	public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();

-

-	private static final long serialVersionUID = 1L;

-

-	private MurmurHash3BinaryHashFunctionFamily() {

-	}

-

-	private static final int C1 = 0xcc9e2d51;

-	private static final int C2 = 0x1b873593;

-	private static final int C3 = 5;

-	private static final int C4 = 0xe6546b64;

-	private static final int C5 = 0x85ebca6b;

-	private static final int C6 = 0xc2b2ae35;

-

-	@Override

-	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

-		return new IBinaryHashFunction() {

-			@Override

-			public int hash(byte[] bytes, int offset, int length) {

-				int h = seed;

-				int p = offset;

-				int remain = length;

-				while (remain > 4) {

-					int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8)

-							| (((int) bytes[p + 2]) << 16)

-							| (((int) bytes[p + 3]) << 24);

-					k *= C1;

-					k = Integer.rotateLeft(k, 15);

-					k *= C2;

-					h ^= k;

-					h = Integer.rotateLeft(h, 13);

-					h = h * C3 + C4;

-					p += 4;

-					remain -= 4;

-				}

-				int k = 0;

-				for (int i = 0; remain > 0; i += 8) {

-					k ^= bytes[p++] << i;

-					remain--;

-				}

-				k *= C1;

-				k = Integer.rotateLeft(k, 15);

-				k *= C2;

-				h ^= k;

-				// switch (remain) {

-				// case 3:

-				// k = bytes[p++];

-				// case 2:

-				// k = (k << 8) | bytes[p++];

-				// case 1:

-				// k = (k << 8) | bytes[p++];

-				// k *= C1;

-				// k = Integer.rotateLeft(k, 15);

-				// k *= C2;

-				// h ^= k;

-				// h = Integer.rotateLeft(h, 13);

-				// h = h * C3 + C4;

-				// }

-				h ^= length;

-				h ^= (h >>> 16);

-				h *= C5;

-				h ^= (h >>> 13);

-				h *= C6;

-				h ^= (h >>> 16);

-				return h;

-			}

-		};

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
index b9a0443..7ead93e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
@@ -1,6 +1,6 @@
 package edu.uci.ics.genomix.data.std.accessors;

 

-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

+import edu.uci.ics.genomix.data.std.primitive.VLongKmerPointable;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

 import edu.uci.ics.hyracks.data.std.api.IHashable;

@@ -12,14 +12,14 @@
 	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

 

 		return new IBinaryHashFunction() {

-			private VLongPointable p = new VLongPointable();

+			private VLongKmerPointable p = new VLongKmerPointable();

 

 			@Override

 			public int hash(byte[] bytes, int offset, int length) {

 				if (length + offset >= bytes.length)

 					throw new IllegalStateException("out of bound");

 				p.set(bytes, offset, length);

-				int hash = Math.abs(((IHashable) p).hash() * (seed + 1));

+				int hash = Math.abs( p.hash() * (seed + 1));

 				if (hash < 0) {

 					hash = Math.abs(hash + 1);

 				}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java
new file mode 100644
index 0000000..cb71310
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java
@@ -0,0 +1,111 @@
+package edu.uci.ics.genomix.data.std.primitive;

+

+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

+import edu.uci.ics.hyracks.data.std.api.IComparable;

+import edu.uci.ics.hyracks.data.std.api.IHashable;

+import edu.uci.ics.hyracks.data.std.api.INumeric;

+import edu.uci.ics.hyracks.data.std.api.IPointable;

+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

+

+public final class VLongKmerPointable extends AbstractPointable implements

+		IHashable, IComparable, INumeric {

+	public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

+		private static final long serialVersionUID = 1L;

+

+		@Override

+		public boolean isFixedLength() {

+			return false;

+		}

+

+		@Override

+		public int getFixedLength() {

+			return -1;

+		}

+	};

+

+	public static final IPointableFactory FACTORY = new IPointableFactory() {

+		private static final long serialVersionUID = 1L;

+

+		@Override

+		public IPointable createPointable() {

+			return new VLongKmerPointable();

+		}

+

+		@Override

+		public ITypeTraits getTypeTraits() {

+			return TYPE_TRAITS;

+		}

+	};

+

+

+	public long getLong() {

+		return KmerHashPartitioncomputerFactory.getLong(bytes, start);

+	}

+

+	@Override

+	public int compareTo(IPointable pointer) {

+		return compareTo(pointer.getByteArray(), pointer.getStartOffset(),

+				pointer.getLength());

+	}

+

+	@Override

+	public int compareTo(byte[] bytes, int start, int length) {

+

+		if (this.length != length) {

+			return this.length - length;

+		}

+

+		for (int i = 0; i < length; i++) {

+			if (this.bytes[this.start + i] < bytes[start + i]) {

+				return -1;

+			} else if (this.bytes[this.start + i] > bytes[start + i]) {

+				return 1;

+			}

+		}

+		return 0;

+	}

+

+	@Override

+	public int hash() {// BKDRHash

+		int hash = 1;

+		for (int i = start + 1; i <= start + length; i++)

+			hash = (31 * hash) + (int) bytes[i];

+		if (hash < 0) {

+			hash = -(hash + 1);

+		}

+		return hash;

+	}

+

+	@Override

+	public byte byteValue() {

+		return (byte) bytes[start + 1];

+	}

+

+	@Override

+	public short shortValue() {

+

+		return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public int intValue() {

+		return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public long longValue() {

+		return getLong();

+	}

+

+	@Override

+	public float floatValue() {

+		return getLong();

+	}

+

+	@Override

+	public double doubleValue() {

+		return getLong();

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index c49d6ff..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -1,146 +0,0 @@
-package edu.uci.ics.genomix.data.std.primitive;

-

-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

-import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

-import edu.uci.ics.hyracks.data.std.api.IComparable;

-import edu.uci.ics.hyracks.data.std.api.IHashable;

-import edu.uci.ics.hyracks.data.std.api.INumeric;

-import edu.uci.ics.hyracks.data.std.api.IPointable;

-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

-

-public final class VLongPointable extends AbstractPointable implements

-		IHashable, IComparable, INumeric {

-	public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

-		private static final long serialVersionUID = 1L;

-

-		@Override

-		public boolean isFixedLength() {

-			return false;

-		}

-

-		@Override

-		public int getFixedLength() {

-			return -1;

-		}

-	};

-

-	public static final IPointableFactory FACTORY = new IPointableFactory() {

-		private static final long serialVersionUID = 1L;

-

-		@Override

-		public IPointable createPointable() {

-			return new VLongPointable();

-		}

-

-		@Override

-		public ITypeTraits getTypeTraits() {

-			return TYPE_TRAITS;

-		}

-	};

-

-	public static long getLong(byte[] bytes, int start) {

-		int l = (int) Math.ceil((double) bytes[start] / 4.0);

-		int n = (l < 8) ? l : 8;

-

-		long r = 0;

-		for (int i = 0; i < n; i++) {

-			r <<= 8;

-			r += (long) (bytes[start + 1] & 0xff);

-		}

-

-		return r;

-	}

-

-	public long getLong() {

-		return getLong(bytes, start);

-	}

-

-	public byte[] postIncrement() {

-		int i = start + 1;

-		int l = (int) Math.ceil(bytes[start] / 4);

-		while (i <= start + l) {

-			bytes[i] += 1;

-			if (bytes[i] != 0) {

-				break;

-			}

-			i += 1;

-		}

-		return bytes;

-	}

-

-	@Override

-	public int compareTo(IPointable pointer) {

-		return compareTo(pointer.getByteArray(), pointer.getStartOffset(),

-				pointer.getLength());

-	}

-

-	@Override

-	public int compareTo(byte[] bytes, int start, int length) {

-

-		int be = this.start;

-

-		if (this.bytes[be] != bytes[start]) {

-			return (this.bytes[be] < bytes[start]) ? -1 : 1;

-		}

-

-		int n = this.bytes[be];

-		int l = (int) Math.ceil(n / 4);

-		for (int i = l; i > 0; i--) {

-			if (this.bytes[be + i] < bytes[start + i]) {

-				return -1;

-			} else if (this.bytes[be + i] > bytes[start + i]) {

-				return 1;

-			}

-		}

-		return 0;

-	}

-

-	@Override

-	public int hash() {// BKDRHash

-		int hash = 1;

-		for (int i = start + 1; i <= start + length; i++)

-			hash = (31 * hash) + (int) bytes[i];

-		if (hash < 0) {

-			hash = -hash;

-		}

-		// System.err.println(hash);

-		return hash;

-		/*

-		 * int seed = 131; // 31 131 1313 13131 131313 etc.. int hash = 0; int l

-		 * = (int) Math.ceil((double) bytes[start] / 4.0); for (int i = start +

-		 * 1; i <= start + l; i++) { hash = hash * seed + bytes[i]; } return

-		 * (hash & 0x7FFFFFFF);

-		 */

-	}

-

-	@Override

-	public byte byteValue() {

-		return (byte) bytes[start + 1];

-	}

-

-	@Override

-	public short shortValue() {

-

-		return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

-	}

-

-	@Override

-	public int intValue() {

-		return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

-	}

-

-	@Override

-	public long longValue() {

-		return getLong();

-	}

-

-	@Override

-	public float floatValue() {

-		return getLong();

-	}

-

-	@Override

-	public double doubleValue() {

-		return getLong();

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index ec71111..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -1,176 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.io.BufferedReader;

-import java.io.File;

-import java.io.FileInputStream;

-import java.io.InputStreamReader;

-import java.nio.ByteBuffer;

-import java.util.regex.Matcher;

-import java.util.regex.Pattern;

-

-import org.apache.hadoop.fs.Path;

-

-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.type.Kmer;

-import edu.uci.ics.genomix.type.Kmer.GENE_CODE;

-import edu.uci.ics.hyracks.api.comm.IFrameWriter;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

-import edu.uci.ics.hyracks.api.job.JobSpecification;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

-

-public class FileScanDescriptor extends

-		AbstractSingleActivityOperatorDescriptor {

-

-	private static final long serialVersionUID = 1L;

-	private int k;

-	private String pathSurfix;

-	private int byteNum;

-

-	public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k,

-			String path) {

-		super(spec, 0, 1);

-		this.k = k;

-		this.pathSurfix = path;

-

-		byteNum = (int) Math.ceil((double) k / 4.0);

-		// recordDescriptors[0] = news RecordDescriptor(

-		// new ISerializerDeserializer[] {

-		// UTF8StringSerializerDeserializer.INSTANCE });

-		recordDescriptors[0] = new RecordDescriptor(

-				new ISerializerDeserializer[] { null, null });

-	}

-

-	public FileScanDescriptor(JobSpecification jobSpec, int kmers,

-			Path[] inputPaths) {

-		super(jobSpec, 0, 1);

-		this.k = kmers;

-		this.pathSurfix = inputPaths[0].toString();

-		// recordDescriptors[0] = news RecordDescriptor(

-		// new ISerializerDeserializer[] {

-		// UTF8StringSerializerDeserializer.INSTANCE });

-		recordDescriptors[0] = new RecordDescriptor(

-				new ISerializerDeserializer[] { null,

-						ByteSerializerDeserializer.INSTANCE });

-	}

-

-	public IOperatorNodePushable createPushRuntime(

-			final IHyracksTaskContext ctx,

-			IRecordDescriptorProvider recordDescProvider, int partition,

-			int nPartitions) {

-

-		final int temp = partition;

-

-		// TODO Auto-generated method stub

-		return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {

-			private ArrayTupleBuilder tupleBuilder;

-			private ByteBuffer outputBuffer;

-			private FrameTupleAppender outputAppender;

-

-			@Override

-			public void initialize() {

-

-				tupleBuilder = new ArrayTupleBuilder(2);

-				outputBuffer = ctx.allocateFrame();

-				outputAppender = new FrameTupleAppender(ctx.getFrameSize());

-				outputAppender.reset(outputBuffer, true);

-

-				try {// one try with multiple catch?

-					writer.open();

-					String s = pathSurfix + String.valueOf(temp);

-

-					File tf = new File(s);

-

-					File[] fa = tf.listFiles();

-

-					for (int i = 0; i < fa.length; i++) {

-						BufferedReader readsfile = new BufferedReader(

-								new InputStreamReader(

-										new FileInputStream(fa[i])));

-						String read = readsfile.readLine();

-						// int count = 0;

-						while (read != null) {

-							read = readsfile.readLine();

-							// if(count % 4 == 1)

-							Pattern genePattern = Pattern.compile("[AGCT]+");

-							Matcher geneMatcher = genePattern.matcher(read);

-							boolean isValid = geneMatcher.matches();

-							if (isValid) {

-								SplitReads(read.getBytes(),writer);

-							}

-							// count += 1;

-							// System.err.println(count);

-						}

-					}

-					if (outputAppender.getTupleCount() > 0) {

-						FrameUtils.flushFrame(outputBuffer, writer);

-					}

-					outputAppender = null;

-					outputBuffer = null;

-					// sort code for external sort here?

-					writer.close();

-				} catch (Exception e) {

-					// TODO Auto-generated catch block

-					throw new IllegalStateException(e);

-				}

-			}

-

-			private void SplitReads(byte[] array, IFrameWriter writer) {

-				/** first kmer */

-				byte[] kmer = Kmer.CompressKmer(k, array, 0);

-				byte pre = 0;

-				byte next = GENE_CODE.getAdjBit(array[k]);

-				InsertToFrame(kmer, pre, next, writer);

-

-				/** middle kmer */

-				for (int i = k; i < array.length - 1; i++) {

-					pre = Kmer.MoveKmer(k, kmer, array[i]);

-					next = GENE_CODE.getAdjBit(array[i + 1]);

-					InsertToFrame(kmer, pre, next, writer);

-

-				}

-				/** last kmer */

-				pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);

-				next = 0;

-				InsertToFrame(kmer, pre, next, writer);

-			}

-

-			private void InsertToFrame(byte[] kmer, byte pre, byte next,

-					IFrameWriter writer) {

-				try {

-					byte adj = GENE_CODE.mergePreNextAdj(pre, next);

-					tupleBuilder.reset();

-					tupleBuilder.addField(kmer, 0, byteNum);

-					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,

-							adj);

-

-					if (!outputAppender.append(

-							tupleBuilder.getFieldEndOffsets(),

-							tupleBuilder.getByteArray(), 0,

-							tupleBuilder.getSize())) {

-						FrameUtils.flushFrame(outputBuffer, writer);

-						outputAppender.reset(outputBuffer, true);

-						if (!outputAppender.append(

-								tupleBuilder.getFieldEndOffsets(),

-								tupleBuilder.getByteArray(), 0,

-								tupleBuilder.getSize())) {

-							throw new IllegalStateException(

-									"Failed to copy an record into a frame: the record size is too large.");

-						}

-					}

-				} catch (Exception e) {

-					throw new IllegalStateException(e);

-				}

-			}

-		};

-

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 3cc4b8a..51e5221 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -38,6 +38,7 @@
 
 		KmerCountValue reEnterCount = new KmerCountValue();
 		BytesWritable reEnterKey = new BytesWritable();
+
 		/**
 		 * assumption is that output never change source!
 		 */
@@ -48,18 +49,19 @@
 				byte[] kmer = tuple.getFieldData(0);
 				int keyStart = tuple.getFieldStart(0);
 				int keyLength = tuple.getFieldLength(0);
-				
+
 				byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
 				byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
 				reEnterCount.reset(bitmap, count);
 				reEnterKey.set(kmer, keyStart, keyLength);
 				writer.append(reEnterKey, reEnterCount);
-				// @mark: this method can not used for read in hadoop 0.20.2. 
-				//writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
+				// @mark: this method can not used for read in hadoop 0.20.2.
+				// writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
 			} catch (IOException e) {
 				throw new HyracksDataException(e);
 			}
 		}
+
 		@Override
 		public void open(DataOutput output) throws HyracksDataException {
 			try {
@@ -70,10 +72,10 @@
 				throw new HyracksDataException(e);
 			}
 		}
+
 		@Override
 		public void close(DataOutput output) throws HyracksDataException {
 			// TODO Auto-generated method stub
-			
 		}
 	}
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index b93e723..0975fd2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -51,7 +51,6 @@
 		@Override
 		public void close(DataOutput output) throws HyracksDataException {
 			// TODO Auto-generated method stub
-			
 		}
 	}
 
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index f8decc3..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -1,151 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.io.BufferedWriter;

-import java.io.FileNotFoundException;

-import java.io.FileOutputStream;

-import java.io.IOException;

-import java.io.OutputStreamWriter;

-import java.nio.ByteBuffer;

-

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.exceptions.HyracksException;

-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

-

-public class PrinterOperatorDescriptor extends

-		AbstractSingleActivityOperatorDescriptor {

-

-	private static final long serialVersionUID = 1L;

-	private String filename;

-	private boolean writeFile;

-	private BufferedWriter twriter;

-	private FileOutputStream stream;

-

-	/**

-	 * The constructor of HDFSWriteOperatorDescriptor.

-	 * 

-	 * @param spec

-	 *            the JobSpecification object

-	 * @param conf

-	 *            the Hadoop JobConf which contains the output path

-	 * @param tupleWriterFactory

-	 *            the ITupleWriterFactory implementation object

-	 * @throws HyracksException

-	 */

-	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {

-		super(spec, 1, 0);

-		writeFile = false;

-	}

-

-	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec,

-			String filename) {

-		super(spec, 1, 0);

-		this.filename = filename;

-		writeFile = true;

-	}

-

-	@Override

-	public IOperatorNodePushable createPushRuntime(

-			final IHyracksTaskContext ctx,

-			final IRecordDescriptorProvider recordDescProvider,

-			final int partition, final int nPartitions)

-			throws HyracksDataException {

-

-		return new AbstractUnaryInputSinkOperatorNodePushable() {

-			private RecordDescriptor inputRd = recordDescProvider

-					.getInputRecordDescriptor(getActivityId(), 0);;

-			private FrameTupleAccessor accessor = new FrameTupleAccessor(

-					ctx.getFrameSize(), inputRd);

-			private FrameTupleReference tuple = new FrameTupleReference();

-

-			@Override

-			public void open() throws HyracksDataException {

-				if (true == writeFile) {

-					try {

-						filename = filename + String.valueOf(partition)

-								+ ".txt";

-						// System.err.println(filename);

-						stream = new FileOutputStream(filename);

-					} catch (FileNotFoundException e) {

-						// TODO Auto-generated catch block

-						e.printStackTrace();

-					}

-					twriter = new BufferedWriter(new OutputStreamWriter(stream));

-				}

-			}

-

-			private void PrintBytes(int no) {

-				try {

-

-					byte[] bytes = tuple.getFieldData(no);

-					int offset = tuple.getFieldStart(no);

-					int length = tuple.getFieldLength(no);

-					if (true == writeFile) {

-						for (int j = offset; j < offset + length; j++) {

-							twriter.write(String.valueOf((int) bytes[j]));

-							twriter.write(" ");

-						}

-						twriter.write("&&");

-					} else {

-						for (int j = offset; j < offset + length; j++) {

-							System.err.print(String.valueOf((int) bytes[j]));

-							System.err.print(" ");

-						}

-						System.err.print("&&");

-					}

-				} catch (IOException e) {

-					e.printStackTrace();

-				}

-			}

-

-			@Override

-			public void nextFrame(ByteBuffer buffer)

-					throws HyracksDataException {

-				try {

-					accessor.reset(buffer);

-					int tupleCount = accessor.getTupleCount();

-					for (int i = 0; i < tupleCount; i++) {

-						tuple.reset(accessor, i);

-						int tj = tuple.getFieldCount();

-						for (int j = 0; j < tj; j++) {

-							PrintBytes(j);

-						}

-						if (true == writeFile) {

-							twriter.write("\r\n");

-						} else {

-							System.err.println("");

-						}

-					}

-				} catch (IOException e) {

-					e.printStackTrace();

-				}

-			}

-

-			@Override

-			public void fail() throws HyracksDataException {

-

-			}

-

-			@Override

-			public void close() throws HyracksDataException {

-				if (true == writeFile) {

-					try {

-						twriter.close();

-						stream.close();

-					} catch (IOException e) {

-						// TODO Auto-generated catch block

-						e.printStackTrace();

-					}

-				}

-			}

-

-		};

-	}

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 6256f86..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,422 +0,0 @@
-package edu.uci.ics.genomix.dataflow;

-

-import java.io.File;

-import java.util.logging.Level;

-import java.util.logging.Logger;

-

-import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

-import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.job.JobId;

-import edu.uci.ics.hyracks.api.job.JobSpecification;

-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

-import edu.uci.ics.hyracks.control.nc.NodeControllerService;

-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

-

-public class Tester {

-

-	private static final Logger LOGGER = Logger.getLogger(Tester.class

-			.getName());

-	public static final String NC1_ID = "nc1";

-	public static final String NC2_ID = "nc2";

-	public static final String NC3_ID = "nc3";

-	public static final String NC4_ID = "nc4";

-

-	private static ClusterControllerService cc;

-	private static NodeControllerService nc1;

-	private static NodeControllerService nc2;

-	private static NodeControllerService nc3;

-	private static NodeControllerService nc4;

-	private static IHyracksClientConnection hcc;

-

-	// private static final boolean DEBUG = true;

-

-	public static void main(String[] args) throws Exception {

-

-		try {

-			LOGGER.setLevel(Level.OFF);

-

-			init();

-

-			// Options options = new Options();

-

-			IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1",

-					39000);

-

-			/*

-			 * JobSpecification job =

-			 * createJob(parseFileSplits(options.inFileCustomerSplits),

-			 * parseFileSplits(options.inFileOrderSplits),

-			 * parseFileSplits(options.outFileSplits),

-			 * options.numJoinPartitions, options.algo, options.graceInputSize,

-			 * options.graceRecordsPerFrame, options.graceFactor,

-			 * options.memSize, options.tableSize, options.hasGroupBy);

-			 */

-

-			int k, page_num;

-			String file_name = args[0];

-			k = Integer.parseInt(args[1]);

-			page_num = Integer.parseInt(args[2]);

-			int type = Integer.parseInt(args[3]);

-

-			JobSpecification job = createJob(file_name, k, page_num, type);

-

-			long start = System.currentTimeMillis();

-			JobId jobId = hcc.startJob("test", job);

-			hcc.waitForCompletion(jobId);

-			long end = System.currentTimeMillis();

-			System.err.println(start + " " + end + " " + (end - start));

-		} finally {

-		}

-		/*

-		 * 

-		 * String s = "g:\\data\\results.txt" ;

-		 * 

-		 * filenames = new FileOutputStream(s); // filenames = new

-		 * FileInputStream("filename.txt");

-		 * 

-		 * BufferedWriter writer = new BufferedWriter(new

-		 * OutputStreamWriter(filenames)); writer.write((int) (end-start));

-		 * writer.close();

-		 */

-

-	}

-

-	public static void init() throws Exception {

-		CCConfig ccConfig = new CCConfig();

-		ccConfig.clientNetIpAddress = "127.0.0.1";

-		ccConfig.clientNetPort = 39000;

-		ccConfig.clusterNetIpAddress = "127.0.0.1";

-		ccConfig.clusterNetPort = 39001;

-		ccConfig.profileDumpPeriod = -1;

-		File outDir = new File("target/ClusterController");

-		outDir.mkdirs();

-		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

-				outDir);

-		ccRoot.delete();

-		ccRoot.mkdir();

-		ccConfig.ccRoot = ccRoot.getAbsolutePath();

-		cc = new ClusterControllerService(ccConfig);

-		cc.start();

-		ccConfig.defaultMaxJobAttempts = 0;

-

-		NCConfig ncConfig1 = new NCConfig();

-		ncConfig1.ccHost = "localhost";

-		ncConfig1.ccPort = 39001;

-		ncConfig1.clusterNetIPAddress = "127.0.0.1";

-		ncConfig1.dataIPAddress = "127.0.0.1";

-		ncConfig1.nodeId = NC1_ID;

-		nc1 = new NodeControllerService(ncConfig1);

-		nc1.start();

-

-//		NCConfig ncConfig2 = new NCConfig();

-//		ncConfig2.ccHost = "localhost";

-//		ncConfig2.ccPort = 39001;

-//		ncConfig2.clusterNetIPAddress = "127.0.0.1";

-//		ncConfig2.dataIPAddress = "127.0.0.1";

-//		ncConfig2.nodeId = NC2_ID;

-//		nc2 = new NodeControllerService(ncConfig2);

-//		nc2.start();

-//		NCConfig ncConfig3 = new NCConfig();

-//		ncConfig3.ccHost = "localhost";

-//		ncConfig3.ccPort = 39001;

-//		ncConfig3.clusterNetIPAddress = "127.0.0.1";

-//		ncConfig3.dataIPAddress = "127.0.0.1";

-//		ncConfig3.nodeId = NC3_ID;

-//		nc3 = new NodeControllerService(ncConfig3);

-//		nc3.start();

-//		NCConfig ncConfig4 = new NCConfig();

-//		ncConfig4.ccHost = "localhost";

-//		ncConfig4.ccPort = 39001;

-//		ncConfig4.clusterNetIPAddress = "127.0.0.1";

-//		ncConfig4.dataIPAddress = "127.0.0.1";

-//		ncConfig4.nodeId = NC4_ID;

-//		nc4 = new NodeControllerService(ncConfig4);

-//		nc4.start();

-

-		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

-				ccConfig.clientNetPort);

-		hcc.createApplication("test", null);

-		if (LOGGER.isLoggable(Level.INFO)) {

-			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

-		}

-	}

-

-	private static JobSpecification createJob(String filename, int k,

-			int page_num, int type) throws HyracksDataException {

-		JobSpecification spec = new JobSpecification();

-

-		// spec.setFrameSize(32768);

-		spec.setFrameSize(32768);

-

-		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-//				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-		 NC1_ID);

-

-		RecordDescriptor outputRec = new RecordDescriptor(

-				new ISerializerDeserializer[] { null,

-						ByteSerializerDeserializer.INSTANCE,

-						ByteSerializerDeserializer.INSTANCE });

-		// Integer64SerializerDeserializer.INSTANCE,

-		// ByteSerializerDeserializer.INSTANCE,

-		// ByteSerializerDeserializer.INSTANCE });

-

-		int[] keyFields = new int[] { 0 };

-		int frameLimits = 4096; // hyracks oriented

-		int tableSize = 10485767; // hyracks oriented

-

-		AbstractOperatorDescriptor single_grouper;

-		IConnectorDescriptor conn_partition;

-		AbstractOperatorDescriptor cross_grouper;

-

-		if (0 == type) {// external group by

-			single_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

-

-			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-					new KmerHashPartitioncomputerFactory());

-			cross_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

-		} else if (1 == type) {

-			single_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

-			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

-					spec,

-					new KmerHashPartitioncomputerFactory(),

-					keyFields,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) });

-			cross_grouper = new PreclusteredGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new DistributedMergeLmerAggregateFactory(), outputRec);

-		} else {

-			long inputSizeInRawRecords = 154000000;

-			long inputSizeInUniqueKeys = 38500000;

-			int recordSizeInBytes = 4;

-			int hashfuncStartLevel = 1;

-			single_grouper = new HybridHashGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					inputSizeInRawRecords,

-					inputSizeInUniqueKeys,

-					recordSizeInBytes,

-					tableSize,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

-					// new IBinaryHashFunctionFamily[]

-					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-					hashfuncStartLevel,

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					new DistributedMergeLmerAggregateFactory(), outputRec, true);

-			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-					new KmerHashPartitioncomputerFactory());

-			recordSizeInBytes = 13;

-			cross_grouper = new HybridHashGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					inputSizeInRawRecords,

-					inputSizeInUniqueKeys,

-					recordSizeInBytes,

-					tableSize,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

-					// new IBinaryHashFunctionFamily[]

-					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-					hashfuncStartLevel,

-					new VLongNormalizedKeyComputerFactory(),

-					new DistributedMergeLmerAggregateFactory(),

-					new DistributedMergeLmerAggregateFactory(), outputRec, true);

-		}

-

-		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		 single_grouper, NC1_ID);

-//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-//				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-

-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

-				spec);

-		spec.connect(readfileConn, scan, 0, single_grouper, 0);

-

-		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		 cross_grouper, NC1_ID);

-//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-//				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

-

-		// PrinterOperatorDescriptor printer = new

-		// PrinterOperatorDescriptor(spec);

-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,

-				"result");

-//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

-//				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		 printer,

-		 NC1_ID);

-

-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

-		spec.connect(printConn, cross_grouper, 0, printer, 0);

-		// spec.connect(readfileConn, scan, 0, printer, 0);

-

-		spec.addRoot(printer);

-

-		if (1 == type) {

-			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

-		}

-		// System.out.println(spec.toString());

-		return spec;

-	}

-

-	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

-		private static final long serialVersionUID = 1L;

-

-		private final IBinaryComparatorFactory bFactory;

-		private final int pos0;

-		private final int pos1;

-

-		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

-				int pos0, int pos1) {

-			this.bFactory = bFactory;

-			this.pos0 = pos0;

-			this.pos1 = pos1;

-		}

-

-		@Override

-		public ITuplePairComparator createTuplePairComparator(

-				IHyracksTaskContext ctx) {

-			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

-					pos1);

-		}

-	}

-

-	static class JoinComparator implements ITuplePairComparator {

-

-		private final IBinaryComparator bComparator;

-		private final int field0;

-		private final int field1;

-

-		public JoinComparator(IBinaryComparator bComparator, int field0,

-				int field1) {

-			this.bComparator = bComparator;

-			this.field0 = field0;

-			this.field1 = field1;

-		}

-

-		@Override

-		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

-				IFrameTupleAccessor accessor1, int tIndex1) {

-			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

-			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

-

-			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

-			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

-

-			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

-			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

-			int fLen0 = fEnd0 - fStart0;

-

-			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

-			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

-			int fLen1 = fEnd1 - fStart1;

-

-			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

-					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

-					fStart1 + fStartOffset1, fLen1);

-			if (c != 0) {

-				return c;

-			}

-			return 0;

-		}

-	}

-

-}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index e0bd786..27066a2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -12,8 +12,6 @@
 import edu.uci.ics.genomix.job.GenomixJob;
 import edu.uci.ics.genomix.job.JobGen;
 import edu.uci.ics.genomix.job.JobGenBrujinGraph;
-import edu.uci.ics.genomix.job.JobGenContigsGeneration;
-import edu.uci.ics.genomix.job.JobGenGraphCleanning;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -84,12 +82,6 @@
 				jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
 						numPartitionPerMachine);
 				break;
-			case GRAPH_CLEANNING:
-				jobGen = new JobGenGraphCleanning(job);
-				break;
-			case CONTIGS_GENERATION:
-				jobGen = new JobGenContigsGeneration(job);
-				break;
 			}
 
 			start = System.currentTimeMillis();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index fd8af39..7c4dcf0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -11,7 +11,7 @@
 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
+import edu.uci.ics.genomix.data.std.primitive.VLongKmerPointable;
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
 import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
@@ -110,7 +110,7 @@
 				keyFields,
 				frameLimits,
 				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(VLongPointable.FACTORY) },
+						.of(VLongKmerPointable.FACTORY) },
 				new VLongNormalizedKeyComputerFactory(),
 				aggeragater,
 				new DistributedMergeLmerAggregateFactory(),
@@ -119,7 +119,7 @@
 						new FieldHashPartitionComputerFactory(
 								keyFields,
 								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-										.of(VLongPointable.FACTORY) }),
+										.of(VLongKmerPointable.FACTORY) }),
 						tableSize), true);
 	}
 
@@ -137,7 +137,7 @@
 				recordSizeInBytes,
 				tableSize,
 				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-						.of(VLongPointable.FACTORY) },
+						.of(VLongKmerPointable.FACTORY) },
 				new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
 				hashfuncStartLevel, new VLongNormalizedKeyComputerFactory(),
 				new MergeKmerAggregateFactory(),
@@ -166,12 +166,12 @@
 					new KmerHashPartitioncomputerFactory(),
 					keyFields,
 					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(VLongPointable.FACTORY) });
+							.of(VLongKmerPointable.FACTORY) });
 			crossGrouper = new PreclusteredGroupOperatorDescriptor(
 					jobSpec,
 					keyFields,
 					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-							.of(VLongPointable.FACTORY) },
+							.of(VLongKmerPointable.FACTORY) },
 					new DistributedMergeLmerAggregateFactory(),
 					combineOutputRec);
 			break;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
index 6d30fad..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -1,23 +0,0 @@
-package edu.uci.ics.genomix.job;
-
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class JobGenContigsGeneration extends JobGen {
-
-	public JobGenContigsGeneration(GenomixJob job) {
-		super(job);
-	}
-
-	@Override
-	public JobSpecification generateJob() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	protected void initJobConfiguration() {
-		// TODO Auto-generated method stub
-
-	}
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
index 43b5a97..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -1,23 +0,0 @@
-package edu.uci.ics.genomix.job;
-
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class JobGenGraphCleanning extends JobGen {
-
-	public JobGenGraphCleanning(GenomixJob job) {
-		super(job);
-	}
-
-	@Override
-	public JobSpecification generateJob() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	protected void initJobConfiguration() {
-		// TODO Auto-generated method stub
-
-	}
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index f872ff9..12ca9c9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -96,7 +96,7 @@
 		Path dest = new Path(HDFS_INPUT_PATH);
 		Path result = new Path(HDFS_OUTPUT_PATH);
 		dfs.mkdirs(dest);
-		dfs.mkdirs(result);
+		//dfs.mkdirs(result);
 		dfs.copyFromLocalFile(src, dest);
 
 		DataOutputStream confOutput = new DataOutputStream(
@@ -119,9 +119,9 @@
 
 	@Test
 	public void TestExternalGroupby() throws Exception {
-		cleanUpReEntry();
+		//cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "external");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		System.err.println("Testing ExternalGroupBy");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
@@ -131,7 +131,7 @@
 	public void TestPreClusterGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		System.err.println("Testing PreClusterGroupBy");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
@@ -141,7 +141,7 @@
 	public void TestHybridGroupby() throws Exception {
 		cleanUpReEntry();
 		conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
-		conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+		conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
 		System.err.println("Testing HybridGroupBy");
 		driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
 		Assert.assertEquals(true, checkResults());
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..e82006c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -256,7 +256,7 @@
 
                 outputAppender.reset(outputFrame, true);
 
-                writer.open();
+                //writer.open();
 
                 if (tPointers == null) {
                     // Not sorted
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 36d5f55..71ec2d1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -39,91 +39,102 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
 
 /**
- * The HDFS file write operator using the Hadoop old API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop old API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
  */
 @SuppressWarnings("deprecation")
-public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class HDFSWriteOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
-    private ConfFactory confFactory;
-    private ITupleWriterFactory tupleWriterFactory;
+	private static final long serialVersionUID = 1L;
+	private ConfFactory confFactory;
+	private ITupleWriterFactory tupleWriterFactory;
 
-    /**
-     * The constructor of HDFSWriteOperatorDescriptor.
-     * 
-     * @param spec
-     *            the JobSpecification object
-     * @param conf
-     *            the Hadoop JobConf which contains the output path
-     * @param tupleWriterFactory
-     *            the ITupleWriterFactory implementation object
-     * @throws HyracksException
-     */
-    public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf, ITupleWriterFactory tupleWriterFactory)
-            throws HyracksException {
-        super(spec, 1, 0);
-        this.confFactory = new ConfFactory(conf);
-        this.tupleWriterFactory = tupleWriterFactory;
-    }
+	/**
+	 * The constructor of HDFSWriteOperatorDescriptor.
+	 * 
+	 * @param spec
+	 *            the JobSpecification object
+	 * @param conf
+	 *            the Hadoop JobConf which contains the output path
+	 * @param tupleWriterFactory
+	 *            the ITupleWriterFactory implementation object
+	 * @throws HyracksException
+	 */
+	public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf,
+			ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+		super(spec, 1, 0);
+		this.confFactory = new ConfFactory(conf);
+		this.tupleWriterFactory = tupleWriterFactory;
+	}
 
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+	@Override
+	public IOperatorNodePushable createPushRuntime(
+			final IHyracksTaskContext ctx,
+			final IRecordDescriptorProvider recordDescProvider,
+			final int partition, final int nPartitions)
+			throws HyracksDataException {
 
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
+		return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private FSDataOutputStream dos;
-            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
-            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
-            private FrameTupleReference tuple = new FrameTupleReference();
-            private ITupleWriter tupleWriter;
-            private ClassLoader ctxCL;
+			private FSDataOutputStream dos;
+			private RecordDescriptor inputRd = recordDescProvider
+					.getInputRecordDescriptor(getActivityId(), 0);;
+			private FrameTupleAccessor accessor = new FrameTupleAccessor(
+					ctx.getFrameSize(), inputRd);
+			private FrameTupleReference tuple = new FrameTupleReference();
+			private ITupleWriter tupleWriter;
+			private ClassLoader ctxCL;
 
-            @Override
-            public void open() throws HyracksDataException {
-                ctxCL = Thread.currentThread().getContextClassLoader();
-                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-                JobConf conf = confFactory.getConf();
-                String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
-                String fileName = outputDirPath + File.separator + "part-" + partition;
+			@Override
+			public void open() throws HyracksDataException {
+				ctxCL = Thread.currentThread().getContextClassLoader();
+				Thread.currentThread().setContextClassLoader(
+						this.getClass().getClassLoader());
+				JobConf conf = confFactory.getConf();
+				String outputDirPath = FileOutputFormat.getOutputPath(conf)
+						.toString();
+				String fileName = outputDirPath + File.separator + "part-"
+						+ partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
-                try {
-                    FileSystem dfs = FileSystem.get(conf);
-                    dos = dfs.create(new Path(fileName), true);
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+				tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+				try {
+					FileSystem dfs = FileSystem.get(conf);
+					dos = dfs.create(new Path(fileName), true);
+					tupleWriter.open(dos);
+				} catch (Exception e) {
+					throw new HyracksDataException(e);
+				}
+			}
 
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                accessor.reset(buffer);
-                int tupleCount = accessor.getTupleCount();
-                for (int i = 0; i < tupleCount; i++) {
-                    tuple.reset(accessor, i);
-                    tupleWriter.write(dos, tuple);
-                }
-            }
+			@Override
+			public void nextFrame(ByteBuffer buffer)
+					throws HyracksDataException {
+				accessor.reset(buffer);
+				int tupleCount = accessor.getTupleCount();
+				for (int i = 0; i < tupleCount; i++) {
+					tuple.reset(accessor, i);
+					tupleWriter.write(dos, tuple);
+				}
+			}
 
-            @Override
-            public void fail() throws HyracksDataException {
+			@Override
+			public void fail() throws HyracksDataException {
 
-            }
+			}
 
-            @Override
-            public void close() throws HyracksDataException {
-                try {
-                    dos.close();
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
-                } finally {
-                    Thread.currentThread().setContextClassLoader(ctxCL);
-                }
-            }
+			@Override
+			public void close() throws HyracksDataException {
+				try {
+					tupleWriter.close(dos);
+					dos.close();
+				} catch (Exception e) {
+					throw new HyracksDataException(e);
+				} finally {
+					Thread.currentThread().setContextClassLoader(ctxCL);
+				}
+			}
 
-        };
-    }
+		};
+	}
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 86ee527..d292673 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -92,6 +92,7 @@
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
+                    tupleWriter.open(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -115,6 +116,7 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
+                	tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);