split the genomix-core into genomix-core and  genomix-hyracks

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3029 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/assembly/binary-assembly.xml b/genomix/genomix-hyracks/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..68d424a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+	<id>binary-assembly</id>
+	<formats>
+		<format>zip</format>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>target/appassembler/bin</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>target/appassembler/lib</directory>
+			<outputDirectory>lib</outputDirectory>
+		</fileSet>
+	</fileSets>
+</assembly>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..dae1b6f
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.genomix.data.normalizers;

+

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

+

+/**

+ * Aggregation sort: speed up from hyracks

+ * 

+ */

+public class Integer64NormalizedKeyComputerFactory implements

+		INormalizedKeyComputerFactory {

+	private static final long serialVersionUID = 8735044913496854551L;

+

+	@Override

+	public INormalizedKeyComputer createNormalizedKeyComputer() {

+		return new INormalizedKeyComputer() {

+			private static final int POSTIVE_LONG_MASK = (3 << 30);

+			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

+			private static final int NEGATIVE_LONG_MASK = (0 << 30);

+

+			@Override

+			public int normalize(byte[] bytes, int start, int length) {

+				long value = Integer64SerializerDeserializer.getLong(bytes,

+						start);

+				int highValue = (int) (value >> 32);

+				if (highValue > 0) {

+					/** * larger than Integer.MAX */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= POSTIVE_LONG_MASK;

+					return highNmk;

+				} else if (highValue == 0) {

+					/** * smaller than Integer.MAX but >=0 */

+					int lowNmk = (int) value;

+					lowNmk >>= 2;

+					lowNmk |= NON_NEGATIVE_INT_MASK;

+					return lowNmk;

+				} else {

+					/** * less than 0: have not optimized for that */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= NEGATIVE_LONG_MASK;

+					return highNmk;

+				}

+			}

+

+			private int getKey(int value) {

+				return value ^ Integer.MIN_VALUE;

+			}

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..969431c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.genomix.data.normalizers;

+

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

+

+public class VLongNormalizedKeyComputerFactory implements

+		INormalizedKeyComputerFactory {

+	private static final long serialVersionUID = 8735044913496854551L;

+

+	@Override

+	public INormalizedKeyComputer createNormalizedKeyComputer() {

+		return new INormalizedKeyComputer() {

+			private static final int POSTIVE_LONG_MASK = (3 << 30);

+			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

+			private static final int NEGATIVE_LONG_MASK = (0 << 30);

+

+			private long getLong(byte[] bytes, int offset) {

+				int l = (int) Math.ceil((double) bytes[offset] / 4.0);

+				int n = (l < 8) ? l : 8;

+

+				long r = 0;

+				for (int i = 0; i < n; i++) {

+					r <<= 8;

+					r += (long) (bytes[offset + i + 1] & 0xff);

+				}

+

+				return r;

+			}

+

+			/**

+			 * one kmer

+			 */

+			@Override

+			public int normalize(byte[] bytes, int start, int length) {

+				long value = getLong(bytes, start);

+

+				int highValue = (int) (value >> 32);

+				if (highValue > 0) {

+					/** * larger than Integer.MAX */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= POSTIVE_LONG_MASK;

+					return highNmk;

+				} else if (highValue == 0) {

+					/** * smaller than Integer.MAX but >=0 */

+					int lowNmk = (int) value;

+					lowNmk >>= 2;

+					lowNmk |= NON_NEGATIVE_INT_MASK;

+					return lowNmk;

+				} else {

+					/** * less than 0: have not optimized for that */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= NEGATIVE_LONG_MASK;

+					return highNmk;

+				}

+			}

+

+			private int getKey(int value) {

+				return value ^ Integer.MIN_VALUE;

+			}

+		};

+	}

+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
new file mode 100644
index 0000000..5d4c624
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.genomix.data.partition;

+

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

+

+public class KmerHashPartitioncomputerFactory implements

+		ITuplePartitionComputerFactory {

+

+	private static final long serialVersionUID = 1L;

+

+	public KmerHashPartitioncomputerFactory() {

+	}

+

+    public static long getLong(byte[] bytes, int offset) {

+        return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)

+                + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)

+                + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)

+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);

+    }

+    

+	@Override

+	public ITuplePartitionComputer createPartitioner() {

+		return new ITuplePartitionComputer() {

+			@Override

+			public int partition(IFrameTupleAccessor accessor, int tIndex,

+					int nParts) {

+				if (nParts == 1) {

+					return 0;

+				}

+				int startOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

+				int slotLength = accessor.getFieldSlotsLength();

+

+				ByteBuffer buf = accessor.getBuffer();

+				long l = getLong(buf.array(), startOffset

+						+ fieldOffset + slotLength);

+				return (int) (l % nParts);

+			}

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
new file mode 100644
index 0000000..d88c0a0
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.genomix.data.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ByteSerializerDeserializer implements
+		ISerializerDeserializer<Byte> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+
+	private ByteSerializerDeserializer() {
+	}
+
+	@Override
+	public Byte deserialize(DataInput in) throws HyracksDataException {
+		try {
+			return in.readByte();
+		} catch (IOException e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
+	@Override
+	public void serialize(Byte instance, DataOutput out)
+			throws HyracksDataException {
+		try {
+			out.writeByte(instance.intValue());
+		} catch (IOException e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
+	public static byte getByte(byte[] bytes, int offset) {
+		return bytes[offset];
+	}
+
+	public static void putByte(byte val, byte[] bytes, int offset) {
+		bytes[offset] = val;
+	}
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..6477e14
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.data.std.api.IHashable;

+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

+

+public class LongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

+	private static final long serialVersionUID = 1L;

+

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+

+		return new IBinaryHashFunction() {

+			private LongPointable p = new LongPointable();

+

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				if (length + offset >= bytes.length)

+					throw new IllegalStateException("out of bound");

+				p.set(bytes, offset, length);

+				int hash = Math.abs(((IHashable) p).hash() * (seed + 1));

+				if (hash < 0) {

+					hash = Math.abs(hash + 1);

+				}

+				return hash;

+			}

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
new file mode 100644
index 0000000..661559f
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -0,0 +1,40 @@
+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;

+

+public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {

+	public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();

+

+	private static final long serialVersionUID = 1L;

+

+	static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877,

+			3, 29 };

+

+	private LongHashFunctionFamily() {

+	}

+

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(int seed) {

+		final int coefficient = primeCoefficents[seed % primeCoefficents.length];

+		final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];

+

+		return new IBinaryHashFunction() {

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				int h = 0;

+				int utflen = UTF8StringPointable.getUTFLength(bytes, offset);

+				int sStart = offset + 2;

+				int c = 0;

+

+				while (c < utflen) {

+					char ch = UTF8StringPointable.charAt(bytes, sStart + c);

+					h = (coefficient * h + ch) % r;

+					c += UTF8StringPointable.charSize(bytes, sStart + c);

+				}

+				return h;

+			}

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
new file mode 100644
index 0000000..b1db6f2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -0,0 +1,77 @@
+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+

+public class MurmurHash3BinaryHashFunctionFamily implements

+		IBinaryHashFunctionFamily {

+

+	public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();

+

+	private static final long serialVersionUID = 1L;

+

+	private MurmurHash3BinaryHashFunctionFamily() {

+	}

+

+	private static final int C1 = 0xcc9e2d51;

+	private static final int C2 = 0x1b873593;

+	private static final int C3 = 5;

+	private static final int C4 = 0xe6546b64;

+	private static final int C5 = 0x85ebca6b;

+	private static final int C6 = 0xc2b2ae35;

+

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+		return new IBinaryHashFunction() {

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				int h = seed;

+				int p = offset;

+				int remain = length;

+				while (remain > 4) {

+					int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8)

+							| (((int) bytes[p + 2]) << 16)

+							| (((int) bytes[p + 3]) << 24);

+					k *= C1;

+					k = Integer.rotateLeft(k, 15);

+					k *= C2;

+					h ^= k;

+					h = Integer.rotateLeft(h, 13);

+					h = h * C3 + C4;

+					p += 4;

+					remain -= 4;

+				}

+				int k = 0;

+				for (int i = 0; remain > 0; i += 8) {

+					k ^= bytes[p++] << i;

+					remain--;

+				}

+				k *= C1;

+				k = Integer.rotateLeft(k, 15);

+				k *= C2;

+				h ^= k;

+				// switch (remain) {

+				// case 3:

+				// k = bytes[p++];

+				// case 2:

+				// k = (k << 8) | bytes[p++];

+				// case 1:

+				// k = (k << 8) | bytes[p++];

+				// k *= C1;

+				// k = Integer.rotateLeft(k, 15);

+				// k *= C2;

+				// h ^= k;

+				// h = Integer.rotateLeft(h, 13);

+				// h = h * C3 + C4;

+				// }

+				h ^= length;

+				h ^= (h >>> 16);

+				h *= C5;

+				h ^= (h >>> 13);

+				h *= C6;

+				h ^= (h >>> 16);

+				return h;

+			}

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..b9a0443
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.genomix.data.std.accessors;

+

+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.data.std.api.IHashable;

+

+public class VLongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

+	private static final long serialVersionUID = 1L;

+

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+

+		return new IBinaryHashFunction() {

+			private VLongPointable p = new VLongPointable();

+

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				if (length + offset >= bytes.length)

+					throw new IllegalStateException("out of bound");

+				p.set(bytes, offset, length);

+				int hash = Math.abs(((IHashable) p).hash() * (seed + 1));

+				if (hash < 0) {

+					hash = Math.abs(hash + 1);

+				}

+				return hash;

+			}

+		};

+	}

+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
new file mode 100644
index 0000000..c49d6ff
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -0,0 +1,146 @@
+package edu.uci.ics.genomix.data.std.primitive;

+

+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;

+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;

+import edu.uci.ics.hyracks.data.std.api.IComparable;

+import edu.uci.ics.hyracks.data.std.api.IHashable;

+import edu.uci.ics.hyracks.data.std.api.INumeric;

+import edu.uci.ics.hyracks.data.std.api.IPointable;

+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

+

+public final class VLongPointable extends AbstractPointable implements

+		IHashable, IComparable, INumeric {

+	public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

+		private static final long serialVersionUID = 1L;

+

+		@Override

+		public boolean isFixedLength() {

+			return false;

+		}

+

+		@Override

+		public int getFixedLength() {

+			return -1;

+		}

+	};

+

+	public static final IPointableFactory FACTORY = new IPointableFactory() {

+		private static final long serialVersionUID = 1L;

+

+		@Override

+		public IPointable createPointable() {

+			return new VLongPointable();

+		}

+

+		@Override

+		public ITypeTraits getTypeTraits() {

+			return TYPE_TRAITS;

+		}

+	};

+

+	public static long getLong(byte[] bytes, int start) {

+		int l = (int) Math.ceil((double) bytes[start] / 4.0);

+		int n = (l < 8) ? l : 8;

+

+		long r = 0;

+		for (int i = 0; i < n; i++) {

+			r <<= 8;

+			r += (long) (bytes[start + 1] & 0xff);

+		}

+

+		return r;

+	}

+

+	public long getLong() {

+		return getLong(bytes, start);

+	}

+

+	public byte[] postIncrement() {

+		int i = start + 1;

+		int l = (int) Math.ceil(bytes[start] / 4);

+		while (i <= start + l) {

+			bytes[i] += 1;

+			if (bytes[i] != 0) {

+				break;

+			}

+			i += 1;

+		}

+		return bytes;

+	}

+

+	@Override

+	public int compareTo(IPointable pointer) {

+		return compareTo(pointer.getByteArray(), pointer.getStartOffset(),

+				pointer.getLength());

+	}

+

+	@Override

+	public int compareTo(byte[] bytes, int start, int length) {

+

+		int be = this.start;

+

+		if (this.bytes[be] != bytes[start]) {

+			return (this.bytes[be] < bytes[start]) ? -1 : 1;

+		}

+

+		int n = this.bytes[be];

+		int l = (int) Math.ceil(n / 4);

+		for (int i = l; i > 0; i--) {

+			if (this.bytes[be + i] < bytes[start + i]) {

+				return -1;

+			} else if (this.bytes[be + i] > bytes[start + i]) {

+				return 1;

+			}

+		}

+		return 0;

+	}

+

+	@Override

+	public int hash() {// BKDRHash

+		int hash = 1;

+		for (int i = start + 1; i <= start + length; i++)

+			hash = (31 * hash) + (int) bytes[i];

+		if (hash < 0) {

+			hash = -hash;

+		}

+		// System.err.println(hash);

+		return hash;

+		/*

+		 * int seed = 131; // 31 131 1313 13131 131313 etc.. int hash = 0; int l

+		 * = (int) Math.ceil((double) bytes[start] / 4.0); for (int i = start +

+		 * 1; i <= start + l; i++) { hash = hash * seed + bytes[i]; } return

+		 * (hash & 0x7FFFFFFF);

+		 */

+	}

+

+	@Override

+	public byte byteValue() {

+		return (byte) bytes[start + 1];

+	}

+

+	@Override

+	public short shortValue() {

+

+		return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public int intValue() {

+		return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public long longValue() {

+		return getLong();

+	}

+

+	@Override

+	public float floatValue() {

+		return getLong();

+	}

+

+	@Override

+	public double doubleValue() {

+		return getLong();

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..d3de2ba
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.genomix.dataflow;

+

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

+

+/**

+ * used by precluster groupby

+ * 

+ */

+public class ConnectorPolicyAssignmentPolicy implements

+		IConnectorPolicyAssignmentPolicy {

+	private static final long serialVersionUID = 1L;

+	private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+	private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+

+	@Override

+	public IConnectorPolicy getConnectorPolicyAssignment(

+			IConnectorDescriptor c, int nProducers, int nConsumers,

+			int[] fanouts) {

+		if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+			return senderSideMaterializePolicy;

+		} else {

+			return pipeliningPolicy;

+		}

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
new file mode 100644
index 0000000..ec71111
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -0,0 +1,176 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.BufferedReader;

+import java.io.File;

+import java.io.FileInputStream;

+import java.io.InputStreamReader;

+import java.nio.ByteBuffer;

+import java.util.regex.Matcher;

+import java.util.regex.Pattern;

+

+import org.apache.hadoop.fs.Path;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.type.Kmer;

+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

+

+public class FileScanDescriptor extends

+		AbstractSingleActivityOperatorDescriptor {

+

+	private static final long serialVersionUID = 1L;

+	private int k;

+	private String pathSurfix;

+	private int byteNum;

+

+	public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k,

+			String path) {

+		super(spec, 0, 1);

+		this.k = k;

+		this.pathSurfix = path;

+

+		byteNum = (int) Math.ceil((double) k / 4.0);

+		// recordDescriptors[0] = news RecordDescriptor(

+		// new ISerializerDeserializer[] {

+		// UTF8StringSerializerDeserializer.INSTANCE });

+		recordDescriptors[0] = new RecordDescriptor(

+				new ISerializerDeserializer[] { null, null });

+	}

+

+	public FileScanDescriptor(JobSpecification jobSpec, int kmers,

+			Path[] inputPaths) {

+		super(jobSpec, 0, 1);

+		this.k = kmers;

+		this.pathSurfix = inputPaths[0].toString();

+		// recordDescriptors[0] = news RecordDescriptor(

+		// new ISerializerDeserializer[] {

+		// UTF8StringSerializerDeserializer.INSTANCE });

+		recordDescriptors[0] = new RecordDescriptor(

+				new ISerializerDeserializer[] { null,

+						ByteSerializerDeserializer.INSTANCE });

+	}

+

+	public IOperatorNodePushable createPushRuntime(

+			final IHyracksTaskContext ctx,

+			IRecordDescriptorProvider recordDescProvider, int partition,

+			int nPartitions) {

+

+		final int temp = partition;

+

+		// TODO Auto-generated method stub

+		return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {

+			private ArrayTupleBuilder tupleBuilder;

+			private ByteBuffer outputBuffer;

+			private FrameTupleAppender outputAppender;

+

+			@Override

+			public void initialize() {

+

+				tupleBuilder = new ArrayTupleBuilder(2);

+				outputBuffer = ctx.allocateFrame();

+				outputAppender = new FrameTupleAppender(ctx.getFrameSize());

+				outputAppender.reset(outputBuffer, true);

+

+				try {// one try with multiple catch?

+					writer.open();

+					String s = pathSurfix + String.valueOf(temp);

+

+					File tf = new File(s);

+

+					File[] fa = tf.listFiles();

+

+					for (int i = 0; i < fa.length; i++) {

+						BufferedReader readsfile = new BufferedReader(

+								new InputStreamReader(

+										new FileInputStream(fa[i])));

+						String read = readsfile.readLine();

+						// int count = 0;

+						while (read != null) {

+							read = readsfile.readLine();

+							// if(count % 4 == 1)

+							Pattern genePattern = Pattern.compile("[AGCT]+");

+							Matcher geneMatcher = genePattern.matcher(read);

+							boolean isValid = geneMatcher.matches();

+							if (isValid) {

+								SplitReads(read.getBytes(),writer);

+							}

+							// count += 1;

+							// System.err.println(count);

+						}

+					}

+					if (outputAppender.getTupleCount() > 0) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+					}

+					outputAppender = null;

+					outputBuffer = null;

+					// sort code for external sort here?

+					writer.close();

+				} catch (Exception e) {

+					// TODO Auto-generated catch block

+					throw new IllegalStateException(e);

+				}

+			}

+

+			private void SplitReads(byte[] array, IFrameWriter writer) {

+				/** first kmer */

+				byte[] kmer = Kmer.CompressKmer(k, array, 0);

+				byte pre = 0;

+				byte next = GENE_CODE.getAdjBit(array[k]);

+				InsertToFrame(kmer, pre, next, writer);

+

+				/** middle kmer */

+				for (int i = k; i < array.length - 1; i++) {

+					pre = Kmer.MoveKmer(k, kmer, array[i]);

+					next = GENE_CODE.getAdjBit(array[i + 1]);

+					InsertToFrame(kmer, pre, next, writer);

+

+				}

+				/** last kmer */

+				pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);

+				next = 0;

+				InsertToFrame(kmer, pre, next, writer);

+			}

+

+			private void InsertToFrame(byte[] kmer, byte pre, byte next,

+					IFrameWriter writer) {

+				try {

+					byte adj = GENE_CODE.mergePreNextAdj(pre, next);

+					tupleBuilder.reset();

+					tupleBuilder.addField(kmer, 0, byteNum);

+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,

+							adj);

+

+					if (!outputAppender.append(

+							tupleBuilder.getFieldEndOffsets(),

+							tupleBuilder.getByteArray(), 0,

+							tupleBuilder.getSize())) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+						outputAppender.reset(outputBuffer, true);

+						if (!outputAppender.append(

+								tupleBuilder.getFieldEndOffsets(),

+								tupleBuilder.getByteArray(), 0,

+								tupleBuilder.getSize())) {

+							throw new IllegalStateException(

+									"Failed to copy an record into a frame: the record size is too large.");

+						}

+					}

+				} catch (Exception e) {

+					throw new IllegalStateException(e);

+				}

+			}

+		};

+

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
new file mode 100644
index 0000000..0ef10cf
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -0,0 +1,73 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.dataflow.util.NonSyncWriter;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class KMerSequenceWriterFactory implements ITupleWriterFactory {
+
+	private static final long serialVersionUID = 1L;
+	private ConfFactory confFactory;
+
+	public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+		this.confFactory = new ConfFactory(conf);
+	}
+
+	public class TupleWriter implements ITupleWriter {
+		public TupleWriter(ConfFactory cf) {
+			this.cf = cf;
+		}
+
+		ConfFactory cf;
+		Writer writer = null;
+//		NonSyncWriter writer = null;
+
+		KmerCountValue reEnterCount = new KmerCountValue();
+		/**
+		 * assumption is that output never change source!
+		 */
+		@Override
+		public void write(DataOutput output, ITupleReference tuple)
+				throws HyracksDataException {
+			try {
+				if (writer == null) {
+//					writer = new NonSyncWriter((FSDataOutputStream) output);
+					writer = SequenceFile.createWriter(cf.getConf(),
+							(FSDataOutputStream) output, BytesWritable.class,
+							BytesWritable.class, CompressionType.NONE, null);
+				}
+				byte[] kmer = tuple.getFieldData(0);
+				int keyStart = tuple.getFieldStart(0);
+				int keyLength = tuple.getFieldLength(0);
+				
+				byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+				byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+				reEnterCount.reset(bitmap, count);
+				writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
+			} catch (IOException e) {
+				throw new HyracksDataException(e);
+			}
+		}
+	}
+
+	@Override
+	public ITupleWriter getTupleWriter() {
+		return new TupleWriter(confFactory);
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
new file mode 100644
index 0000000..332b7d7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.genomix.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.type.Kmer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerTextWriterFactory implements ITupleWriterFactory {
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	private final int KMER;
+
+	public KMerTextWriterFactory(int kmer) {
+		KMER = kmer;
+	}
+
+	public class TupleWriter implements ITupleWriter {
+		@Override
+		public void write(DataOutput output, ITupleReference tuple)
+				throws HyracksDataException {
+			try {
+				output.write(Kmer.recoverKmerFrom(KMER,
+						tuple.getFieldData(0), tuple.getFieldStart(0),
+						tuple.getFieldLength(0)).getBytes());
+				output.writeByte('\t');
+				output.write(Kmer.GENE_CODE.getSymbolFromBitMap(tuple
+						.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+				output.writeByte('\t');
+				output.write(String.valueOf((int)tuple
+						.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
+				output.writeByte('\n');
+			} catch (IOException e) {
+				throw new HyracksDataException(e);
+			}
+		}
+	}
+
+	@Override
+	public ITupleWriter getTupleWriter() {
+		return new TupleWriter();
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
new file mode 100644
index 0000000..f8decc3
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -0,0 +1,151 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.BufferedWriter;

+import java.io.FileNotFoundException;

+import java.io.FileOutputStream;

+import java.io.IOException;

+import java.io.OutputStreamWriter;

+import java.nio.ByteBuffer;

+

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.exceptions.HyracksException;

+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;

+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

+

+public class PrinterOperatorDescriptor extends

+		AbstractSingleActivityOperatorDescriptor {

+

+	private static final long serialVersionUID = 1L;

+	private String filename;

+	private boolean writeFile;

+	private BufferedWriter twriter;

+	private FileOutputStream stream;

+

+	/**

+	 * The constructor of HDFSWriteOperatorDescriptor.

+	 * 

+	 * @param spec

+	 *            the JobSpecification object

+	 * @param conf

+	 *            the Hadoop JobConf which contains the output path

+	 * @param tupleWriterFactory

+	 *            the ITupleWriterFactory implementation object

+	 * @throws HyracksException

+	 */

+	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {

+		super(spec, 1, 0);

+		writeFile = false;

+	}

+

+	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec,

+			String filename) {

+		super(spec, 1, 0);

+		this.filename = filename;

+		writeFile = true;

+	}

+

+	@Override

+	public IOperatorNodePushable createPushRuntime(

+			final IHyracksTaskContext ctx,

+			final IRecordDescriptorProvider recordDescProvider,

+			final int partition, final int nPartitions)

+			throws HyracksDataException {

+

+		return new AbstractUnaryInputSinkOperatorNodePushable() {

+			private RecordDescriptor inputRd = recordDescProvider

+					.getInputRecordDescriptor(getActivityId(), 0);;

+			private FrameTupleAccessor accessor = new FrameTupleAccessor(

+					ctx.getFrameSize(), inputRd);

+			private FrameTupleReference tuple = new FrameTupleReference();

+

+			@Override

+			public void open() throws HyracksDataException {

+				if (true == writeFile) {

+					try {

+						filename = filename + String.valueOf(partition)

+								+ ".txt";

+						// System.err.println(filename);

+						stream = new FileOutputStream(filename);

+					} catch (FileNotFoundException e) {

+						// TODO Auto-generated catch block

+						e.printStackTrace();

+					}

+					twriter = new BufferedWriter(new OutputStreamWriter(stream));

+				}

+			}

+

+			private void PrintBytes(int no) {

+				try {

+

+					byte[] bytes = tuple.getFieldData(no);

+					int offset = tuple.getFieldStart(no);

+					int length = tuple.getFieldLength(no);

+					if (true == writeFile) {

+						for (int j = offset; j < offset + length; j++) {

+							twriter.write(String.valueOf((int) bytes[j]));

+							twriter.write(" ");

+						}

+						twriter.write("&&");

+					} else {

+						for (int j = offset; j < offset + length; j++) {

+							System.err.print(String.valueOf((int) bytes[j]));

+							System.err.print(" ");

+						}

+						System.err.print("&&");

+					}

+				} catch (IOException e) {

+					e.printStackTrace();

+				}

+			}

+

+			@Override

+			public void nextFrame(ByteBuffer buffer)

+					throws HyracksDataException {

+				try {

+					accessor.reset(buffer);

+					int tupleCount = accessor.getTupleCount();

+					for (int i = 0; i < tupleCount; i++) {

+						tuple.reset(accessor, i);

+						int tj = tuple.getFieldCount();

+						for (int j = 0; j < tj; j++) {

+							PrintBytes(j);

+						}

+						if (true == writeFile) {

+							twriter.write("\r\n");

+						} else {

+							System.err.println("");

+						}

+					}

+				} catch (IOException e) {

+					e.printStackTrace();

+				}

+			}

+

+			@Override

+			public void fail() throws HyracksDataException {

+

+			}

+

+			@Override

+			public void close() throws HyracksDataException {

+				if (true == writeFile) {

+					try {

+						twriter.close();

+						stream.close();

+					} catch (IOException e) {

+						// TODO Auto-generated catch block

+						e.printStackTrace();

+					}

+				}

+			}

+

+		};

+	}

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..1f0cd73
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -0,0 +1,112 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.nio.ByteBuffer;

+import java.util.regex.Matcher;

+import java.util.regex.Pattern;

+

+import org.apache.hadoop.io.LongWritable;

+import org.apache.hadoop.io.Text;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.type.Kmer;

+import edu.uci.ics.genomix.type.Kmer.GENE_CODE;

+import edu.uci.ics.hyracks.api.comm.IFrameWriter;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;

+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;

+

+public class ReadsKeyValueParserFactory implements

+		IKeyValueParserFactory<LongWritable, Text> {

+	private static final long serialVersionUID = 1L;

+

+	private int k;

+	private int byteNum;

+

+	public ReadsKeyValueParserFactory(int k) {

+		this.k = k;

+		byteNum = (byte) Math.ceil((double) k / 4.0);

+	}

+

+	@Override

+	public IKeyValueParser<LongWritable, Text> createKeyValueParser(

+			final IHyracksTaskContext ctx) {

+		final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);

+		final ByteBuffer outputBuffer = ctx.allocateFrame();

+		final FrameTupleAppender outputAppender = new FrameTupleAppender(

+				ctx.getFrameSize());

+		outputAppender.reset(outputBuffer, true);

+

+		return new IKeyValueParser<LongWritable, Text>() {

+

+			@Override

+			public void parse(LongWritable key, Text value, IFrameWriter writer)

+					throws HyracksDataException {

+				String geneLine = value.toString(); // Read the Real Gene Line

+				Pattern genePattern = Pattern.compile("[AGCT]+");

+				Matcher geneMatcher = genePattern.matcher(geneLine);

+				boolean isValid = geneMatcher.matches();

+				if (isValid) {

+					SplitReads(geneLine.getBytes(), writer);

+				}

+			}

+

+			@Override

+			public void flush(IFrameWriter writer) throws HyracksDataException {

+				FrameUtils.flushFrame(outputBuffer, writer);

+			}

+

+			private void SplitReads(byte[] array, IFrameWriter writer) {

+				/** first kmer */

+				byte[] kmer = Kmer.CompressKmer(k, array, 0);

+				byte pre = 0;

+				byte next = GENE_CODE.getAdjBit(array[k]);

+				InsertToFrame(kmer, pre, next, writer);

+

+				/** middle kmer */

+				for (int i = k; i < array.length - 1; i++) {

+					pre = Kmer.MoveKmer(k, kmer, array[i]);

+					next = GENE_CODE.getAdjBit(array[i + 1]);

+					InsertToFrame(kmer, pre, next, writer);

+

+				}

+				/** last kmer */

+				pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);

+				next = 0;

+				InsertToFrame(kmer, pre, next, writer);

+			}

+

+			private void InsertToFrame(byte[] kmer, byte pre, byte next,

+					IFrameWriter writer) {

+				try {

+					byte adj = GENE_CODE.mergePreNextAdj(pre, next);

+					tupleBuilder.reset();

+					tupleBuilder.addField(kmer, 0, byteNum);

+					tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,

+							adj);

+

+					if (!outputAppender.append(

+							tupleBuilder.getFieldEndOffsets(),

+							tupleBuilder.getByteArray(), 0,

+							tupleBuilder.getSize())) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+						outputAppender.reset(outputBuffer, true);

+						if (!outputAppender.append(

+								tupleBuilder.getFieldEndOffsets(),

+								tupleBuilder.getByteArray(), 0,

+								tupleBuilder.getSize())) {

+							throw new IllegalStateException(

+									"Failed to copy an record into a frame: the record size is too large.");

+						}

+					}

+				} catch (Exception e) {

+					throw new IllegalStateException(e);

+				}

+			}

+		};

+	}

+

+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
new file mode 100644
index 0000000..6256f86
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -0,0 +1,422 @@
+package edu.uci.ics.genomix.dataflow;

+

+import java.io.File;

+import java.util.logging.Level;

+import java.util.logging.Logger;

+

+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;

+import edu.uci.ics.hyracks.api.client.HyracksConnection;

+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.job.JobId;

+import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

+import edu.uci.ics.hyracks.control.nc.NodeControllerService;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

+

+public class Tester {

+

+	private static final Logger LOGGER = Logger.getLogger(Tester.class

+			.getName());

+	public static final String NC1_ID = "nc1";

+	public static final String NC2_ID = "nc2";

+	public static final String NC3_ID = "nc3";

+	public static final String NC4_ID = "nc4";

+

+	private static ClusterControllerService cc;

+	private static NodeControllerService nc1;

+	private static NodeControllerService nc2;

+	private static NodeControllerService nc3;

+	private static NodeControllerService nc4;

+	private static IHyracksClientConnection hcc;

+

+	// private static final boolean DEBUG = true;

+

+	public static void main(String[] args) throws Exception {

+

+		try {

+			LOGGER.setLevel(Level.OFF);

+

+			init();

+

+			// Options options = new Options();

+

+			IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1",

+					39000);

+

+			/*

+			 * JobSpecification job =

+			 * createJob(parseFileSplits(options.inFileCustomerSplits),

+			 * parseFileSplits(options.inFileOrderSplits),

+			 * parseFileSplits(options.outFileSplits),

+			 * options.numJoinPartitions, options.algo, options.graceInputSize,

+			 * options.graceRecordsPerFrame, options.graceFactor,

+			 * options.memSize, options.tableSize, options.hasGroupBy);

+			 */

+

+			int k, page_num;

+			String file_name = args[0];

+			k = Integer.parseInt(args[1]);

+			page_num = Integer.parseInt(args[2]);

+			int type = Integer.parseInt(args[3]);

+

+			JobSpecification job = createJob(file_name, k, page_num, type);

+

+			long start = System.currentTimeMillis();

+			JobId jobId = hcc.startJob("test", job);

+			hcc.waitForCompletion(jobId);

+			long end = System.currentTimeMillis();

+			System.err.println(start + " " + end + " " + (end - start));

+		} finally {

+		}

+		/*

+		 * 

+		 * String s = "g:\\data\\results.txt" ;

+		 * 

+		 * filenames = new FileOutputStream(s); // filenames = new

+		 * FileInputStream("filename.txt");

+		 * 

+		 * BufferedWriter writer = new BufferedWriter(new

+		 * OutputStreamWriter(filenames)); writer.write((int) (end-start));

+		 * writer.close();

+		 */

+

+	}

+

+	public static void init() throws Exception {

+		CCConfig ccConfig = new CCConfig();

+		ccConfig.clientNetIpAddress = "127.0.0.1";

+		ccConfig.clientNetPort = 39000;

+		ccConfig.clusterNetIpAddress = "127.0.0.1";

+		ccConfig.clusterNetPort = 39001;

+		ccConfig.profileDumpPeriod = -1;

+		File outDir = new File("target/ClusterController");

+		outDir.mkdirs();

+		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

+				outDir);

+		ccRoot.delete();

+		ccRoot.mkdir();

+		ccConfig.ccRoot = ccRoot.getAbsolutePath();

+		cc = new ClusterControllerService(ccConfig);

+		cc.start();

+		ccConfig.defaultMaxJobAttempts = 0;

+

+		NCConfig ncConfig1 = new NCConfig();

+		ncConfig1.ccHost = "localhost";

+		ncConfig1.ccPort = 39001;

+		ncConfig1.clusterNetIPAddress = "127.0.0.1";

+		ncConfig1.dataIPAddress = "127.0.0.1";

+		ncConfig1.nodeId = NC1_ID;

+		nc1 = new NodeControllerService(ncConfig1);

+		nc1.start();

+

+//		NCConfig ncConfig2 = new NCConfig();

+//		ncConfig2.ccHost = "localhost";

+//		ncConfig2.ccPort = 39001;

+//		ncConfig2.clusterNetIPAddress = "127.0.0.1";

+//		ncConfig2.dataIPAddress = "127.0.0.1";

+//		ncConfig2.nodeId = NC2_ID;

+//		nc2 = new NodeControllerService(ncConfig2);

+//		nc2.start();

+//		NCConfig ncConfig3 = new NCConfig();

+//		ncConfig3.ccHost = "localhost";

+//		ncConfig3.ccPort = 39001;

+//		ncConfig3.clusterNetIPAddress = "127.0.0.1";

+//		ncConfig3.dataIPAddress = "127.0.0.1";

+//		ncConfig3.nodeId = NC3_ID;

+//		nc3 = new NodeControllerService(ncConfig3);

+//		nc3.start();

+//		NCConfig ncConfig4 = new NCConfig();

+//		ncConfig4.ccHost = "localhost";

+//		ncConfig4.ccPort = 39001;

+//		ncConfig4.clusterNetIPAddress = "127.0.0.1";

+//		ncConfig4.dataIPAddress = "127.0.0.1";

+//		ncConfig4.nodeId = NC4_ID;

+//		nc4 = new NodeControllerService(ncConfig4);

+//		nc4.start();

+

+		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

+				ccConfig.clientNetPort);

+		hcc.createApplication("test", null);

+		if (LOGGER.isLoggable(Level.INFO)) {

+			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+		}

+	}

+

+	private static JobSpecification createJob(String filename, int k,

+			int page_num, int type) throws HyracksDataException {

+		JobSpecification spec = new JobSpecification();

+

+		// spec.setFrameSize(32768);

+		spec.setFrameSize(32768);

+

+		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

+//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+//				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+		 NC1_ID);

+

+		RecordDescriptor outputRec = new RecordDescriptor(

+				new ISerializerDeserializer[] { null,

+						ByteSerializerDeserializer.INSTANCE,

+						ByteSerializerDeserializer.INSTANCE });

+		// Integer64SerializerDeserializer.INSTANCE,

+		// ByteSerializerDeserializer.INSTANCE,

+		// ByteSerializerDeserializer.INSTANCE });

+

+		int[] keyFields = new int[] { 0 };

+		int frameLimits = 4096; // hyracks oriented

+		int tableSize = 10485767; // hyracks oriented

+

+		AbstractOperatorDescriptor single_grouper;

+		IConnectorDescriptor conn_partition;

+		AbstractOperatorDescriptor cross_grouper;

+

+		if (0 == type) {// external group by

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			cross_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+		} else if (1 == type) {

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

+					spec,

+					new KmerHashPartitioncomputerFactory(),

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) });

+			cross_grouper = new PreclusteredGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new DistributedMergeLmerAggregateFactory(), outputRec);

+		} else {

+			long inputSizeInRawRecords = 154000000;

+			long inputSizeInUniqueKeys = 38500000;

+			int recordSizeInBytes = 4;

+			int hashfuncStartLevel = 1;

+			single_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			recordSizeInBytes = 13;

+			cross_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+		}

+

+		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		 single_grouper, NC1_ID);

+//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+//				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+

+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

+				spec);

+		spec.connect(readfileConn, scan, 0, single_grouper, 0);

+

+		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		 cross_grouper, NC1_ID);

+//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+//				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+

+		// PrinterOperatorDescriptor printer = new

+		// PrinterOperatorDescriptor(spec);

+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,

+				"result");

+//		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

+//				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		 PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		 printer,

+		 NC1_ID);

+

+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+		spec.connect(printConn, cross_grouper, 0, printer, 0);

+		// spec.connect(readfileConn, scan, 0, printer, 0);

+

+		spec.addRoot(printer);

+

+		if (1 == type) {

+			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+		}

+		// System.out.println(spec.toString());

+		return spec;

+	}

+

+	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+		private static final long serialVersionUID = 1L;

+

+		private final IBinaryComparatorFactory bFactory;

+		private final int pos0;

+		private final int pos1;

+

+		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

+				int pos0, int pos1) {

+			this.bFactory = bFactory;

+			this.pos0 = pos0;

+			this.pos1 = pos1;

+		}

+

+		@Override

+		public ITuplePairComparator createTuplePairComparator(

+				IHyracksTaskContext ctx) {

+			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

+					pos1);

+		}

+	}

+

+	static class JoinComparator implements ITuplePairComparator {

+

+		private final IBinaryComparator bComparator;

+		private final int field0;

+		private final int field1;

+

+		public JoinComparator(IBinaryComparator bComparator, int field0,

+				int field1) {

+			this.bComparator = bComparator;

+			this.field0 = field0;

+			this.field1 = field1;

+		}

+

+		@Override

+		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

+				IFrameTupleAccessor accessor1, int tIndex1) {

+			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+

+			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+

+			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+			int fLen0 = fEnd0 - fStart0;

+

+			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+			int fLen1 = fEnd1 - fStart1;

+

+			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

+					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

+					fStart1 + fStartOffset1, fLen1);

+			if (c != 0) {

+				return c;

+			}

+			return 0;

+		}

+	}

+

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
new file mode 100644
index 0000000..00f4256
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -0,0 +1,139 @@
+package edu.uci.ics.genomix.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+/**

+ * sum

+ * 

+ */

+public class DistributedMergeLmerAggregateFactory implements

+		IAggregatorDescriptorFactory {

+	private static final long serialVersionUID = 1L;

+	private static final int MAX = 127;

+

+	public DistributedMergeLmerAggregateFactory() {

+	}

+

+	@Override

+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

+			RecordDescriptor inRecordDescriptor,

+			RecordDescriptor outRecordDescriptor, int[] keyFields,

+			int[] keyFieldsInPartialResults) throws HyracksDataException {

+		return new IAggregatorDescriptor() {

+

+			@Override

+			public void reset() {

+			}

+

+			@Override

+			public void close() {

+				// TODO Auto-generated method stub

+

+			}

+

+			@Override

+			public AggregateState createAggregateStates() {

+				// TODO Auto-generated method stub

+				return new AggregateState(new Object() {

+				});

+			}

+

+			private byte getField(IFrameTupleAccessor accessor, int tIndex,

+					int fieldId) {

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

+				int offset = tupleOffset + fieldStart

+						+ accessor.getFieldSlotsLength();

+				byte data = ByteSerializerDeserializer.getByte(accessor

+						.getBuffer().array(), offset);

+				return data;

+			}

+

+			/**

+			 * met a new kmer

+			 */

+			@Override

+			public void init(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+

+				byte bitmap = getField(accessor, tIndex, 1);

+				byte count = getField(accessor, tIndex, 2);

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when initializing the aggregator.");

+				}

+			}

+

+			@Override

+			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

+					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = getField(accessor, tIndex, 1);

+				byte count = getField(accessor, tIndex, 2);

+

+				int statetupleOffset = stateAccessor

+						.getTupleStartOffset(stateTupleIndex);

+				int statefieldStart = stateAccessor.getFieldStartOffset(

+						stateTupleIndex, 1);

+				int stateoffset = statetupleOffset

+						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

+				

+				byte[] data = stateAccessor.getBuffer().array();

+

+				bitmap |= data[stateoffset];

+				count += data[stateoffset + 1];

+				if (count >= MAX) {

+					count = (byte) MAX;

+				}

+				data[stateoffset] = bitmap;

+				data[stateoffset + 1] = (byte) count;

+			}

+

+			@Override

+			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = getField(accessor, tIndex, 1);

+				byte count = getField(accessor, tIndex, 2);

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

+

+			}

+

+			@Override

+			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				outputPartialResult(tupleBuilder, accessor, tIndex, state);

+			}

+

+		};

+	}

+

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..32c50bb
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.genomix.dataflow.aggregators;

+

+import java.io.DataOutput;

+import java.io.IOException;

+

+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;

+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

+

+/**

+ * count

+ * 

+ */

+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

+	private static final long serialVersionUID = 1L;

+	private static final int MAX = 127;

+

+	public MergeKmerAggregateFactory() {

+	}

+

+	@Override

+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

+			RecordDescriptor inRecordDescriptor,

+			RecordDescriptor outRecordDescriptor, int[] keyFields,

+			int[] keyFieldsInPartialResults) throws HyracksDataException {

+		return new IAggregatorDescriptor() {

+

+			@Override

+			public void reset() {

+			}

+

+			@Override

+			public void close() {

+				// TODO Auto-generated method stub

+

+			}

+

+			@Override

+			public AggregateState createAggregateStates() {

+				// TODO Auto-generated method stub

+				return new AggregateState(new Object() {

+				});

+			}

+

+			private byte getField(IFrameTupleAccessor accessor, int tIndex,

+					int fieldId) {

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);

+				int offset = tupleOffset + fieldStart

+						+ accessor.getFieldSlotsLength();

+				byte data = ByteSerializerDeserializer.getByte(accessor

+						.getBuffer().array(), offset);

+				return data;

+			}

+

+			@Override

+			public void init(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = getField(accessor, tIndex, 1);

+				byte count = 1;

+

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when initializing the aggregator.");

+				}

+

+			}

+

+			@Override

+			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

+					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = getField(accessor, tIndex, 1);

+				short count = 1;

+

+				int statetupleOffset = stateAccessor

+						.getTupleStartOffset(stateTupleIndex);

+				int statefieldStart = stateAccessor.getFieldStartOffset(

+						stateTupleIndex, 1);

+				int stateoffset = statetupleOffset

+						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

+

+				byte[] data = stateAccessor.getBuffer().array();

+

+				bitmap |= data[stateoffset];

+				count += data[stateoffset + 1];

+				if (count >= MAX) {

+					count = (byte) MAX;

+				}

+				data[stateoffset] = bitmap;

+				data[stateoffset + 1] = (byte) count;

+			}

+

+			@Override

+			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = getField(accessor, tIndex, 1);

+				byte count = getField(accessor, tIndex, 2);

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

+

+			}

+

+			@Override

+			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				outputPartialResult(tupleBuilder, accessor, tIndex, state);

+			}

+

+		};

+	}

+

+}

diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/util/NonSyncWriter.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/util/NonSyncWriter.java
new file mode 100644
index 0000000..24c4113
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/util/NonSyncWriter.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.genomix.dataflow.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+
+public class NonSyncWriter {
+	private FSDataOutputStream out;
+
+	public NonSyncWriter(FSDataOutputStream output) {
+		out = output;
+	}
+
+	public void appendRaw(byte[] keyData, int keyOffset, int keyLength,
+			ValueBytes val) throws IOException {
+		out.writeInt(keyLength + val.getSize()); // total record length
+
+		out.writeInt(keyLength); // key portion length
+
+		out.write(keyData, keyOffset, keyLength); // key
+
+		val.writeUncompressedBytes(out); // value
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
new file mode 100644
index 0000000..e0bd786
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -0,0 +1,152 @@
+package edu.uci.ics.genomix.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.job.JobGen;
+import edu.uci.ics.genomix.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.job.JobGenContigsGeneration;
+import edu.uci.ics.genomix.job.JobGenGraphCleanning;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+	public static enum Plan {
+		BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
+	}
+
+	private static final String IS_PROFILING = "genomix.driver.profiling";
+	private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+	private static final String applicationName = GenomixJob.JOB_NAME;
+	private static final Log LOG = LogFactory.getLog(Driver.class);
+	private JobGen jobGen;
+	private boolean profiling;
+
+	private int numPartitionPerMachine;
+
+	private IHyracksClientConnection hcc;
+	private Scheduler scheduler;
+
+	public Driver(String ipAddress, int port, int numPartitionPerMachine)
+			throws HyracksException {
+		try {
+			hcc = new HyracksConnection(ipAddress, port);
+			scheduler = new Scheduler(hcc.getNodeControllerInfos());
+		} catch (Exception e) {
+			throw new HyracksException(e);
+		}
+		this.numPartitionPerMachine = numPartitionPerMachine;
+	}
+
+	public void runJob(GenomixJob job) throws HyracksException {
+		runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+	}
+
+	public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
+			throws HyracksException {
+		/** add hadoop configurations */
+		URL hadoopCore = job.getClass().getClassLoader()
+				.getResource("core-site.xml");
+		job.addResource(hadoopCore);
+		URL hadoopMapRed = job.getClass().getClassLoader()
+				.getResource("mapred-site.xml");
+		job.addResource(hadoopMapRed);
+		URL hadoopHdfs = job.getClass().getClassLoader()
+				.getResource("hdfs-site.xml");
+		job.addResource(hadoopHdfs);
+
+		LOG.info("job started");
+		long start = System.currentTimeMillis();
+		long end = start;
+		long time = 0;
+
+		this.profiling = profiling;
+		try {
+			Map<String, NodeControllerInfo> ncMap = hcc
+					.getNodeControllerInfos();
+			LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+			switch (planChoice) {
+			case BUILD_DEBRUJIN_GRAPH:
+			default:
+				jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
+						numPartitionPerMachine);
+				break;
+			case GRAPH_CLEANNING:
+				jobGen = new JobGenGraphCleanning(job);
+				break;
+			case CONTIGS_GENERATION:
+				jobGen = new JobGenContigsGeneration(job);
+				break;
+			}
+
+			start = System.currentTimeMillis();
+			runCreate(jobGen);
+			end = System.currentTimeMillis();
+			time = end - start;
+			LOG.info("result writing finished " + time + "ms");
+			LOG.info("job finished");
+		} catch (Exception e) {
+			throw new HyracksException(e);
+		}
+	}
+
+	private void runCreate(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification createJob = jobGen.generateJob();
+			execute(createJob);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw e;
+		}
+	}
+
+	private void execute(JobSpecification job) throws Exception {
+		job.setUseConnectorPolicyForScheduling(false);
+		JobId jobId = hcc.startJob(
+				applicationName,
+				job,
+				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+						.noneOf(JobFlag.class));
+		hcc.waitForCompletion(jobId);
+	}
+
+	public static void main(String[] args) throws Exception {
+		GenomixJob jobConf = new GenomixJob();
+		String[] otherArgs = new GenericOptionsParser(jobConf, args)
+				.getRemainingArgs();
+		if (otherArgs.length < 4) {
+			System.err.println("Need <serverIP> <port> <input> <output>");
+			System.exit(-1);
+		}
+		String ipAddress = otherArgs[0];
+		int port = Integer.parseInt(otherArgs[1]);
+		int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+		boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+		// FileInputFormat.setInputPaths(job, otherArgs[2]);
+		{
+			Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+			jobConf.set("mapred.input.dir", path.toString());
+
+			Path outputDir = new Path(jobConf.getWorkingDirectory(),
+					otherArgs[3]);
+			jobConf.set("mapred.output.dir", outputDir.toString());
+		}
+		// FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+		// FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+		Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+		driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+	}
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
new file mode 100644
index 0000000..39f181a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.genomix.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+public class GenomixJob extends JobConf {
+
+	public static final String JOB_NAME = "genomix";
+
+	/** Kmers length */
+	public static final String KMER_LENGTH = "genomix.kmer";
+	/** Frame Size */
+	public static final String FRAME_SIZE = "genomix.framesize";
+	/** Frame Limit, hyracks need */
+	public static final String FRAME_LIMIT = "genomix.framelimit";
+	/** Table Size, hyracks need */
+	public static final String TABLE_SIZE = "genomix.tablesize";
+	/** Groupby types */
+	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+	/** Graph outputformat */
+	public static final String OUTPUT_FORMAT = "genomix.graph.output";
+
+	/** Configurations used by hybrid groupby function in graph build phrase */
+	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+	public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+	public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+	public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+	public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+	
+	public static final int DEFAULT_KMER= 55;
+	public static final int DEFAULT_FRAME_SIZE = 32768;
+	public static final int DEFAULT_FRAME_LIMIT = 4096;
+	public static final int DEFAULT_TABLE_SIZE = 10485767;
+	public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+	public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+	public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+	public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+	
+	public static final String DEFAULT_GROUPBY_TYPE ="hybrid";
+	public static final String DEFAULT_OUTPUT_FORMAT ="binary";
+	
+	public GenomixJob() throws IOException {
+		super(new Configuration());
+	}
+
+	public GenomixJob(Configuration conf) throws IOException {
+		super(conf);
+	}
+
+	/**
+	 * Set the kmer length
+	 * 
+	 * @param the
+	 *            desired frame size
+	 */
+	final public void setKmerLength(int kmerlength) {
+		setInt(KMER_LENGTH, kmerlength);
+	}
+
+	final public void setFrameSize(int frameSize) {
+		setInt(FRAME_SIZE, frameSize);
+	}
+
+	final public void setFrameLimit(int frameLimit) {
+		setInt(FRAME_LIMIT, frameLimit);
+	}
+
+	final public void setTableSize(int tableSize) {
+		setInt(TABLE_SIZE, tableSize);
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
new file mode 100644
index 0000000..557da6b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class JobGen {
+
+	protected final Configuration conf;
+	protected final GenomixJob genomixJob;
+	protected String jobId = new UUID(System.currentTimeMillis(),
+			System.nanoTime()).toString();
+
+	public JobGen(GenomixJob job) {
+		this.conf = job;
+		this.genomixJob = job;
+		this.initJobConfiguration();
+	}
+
+	protected abstract void initJobConfiguration();
+
+	public abstract JobSpecification generateJob() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..95562a6
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -0,0 +1,311 @@
+package edu.uci.ics.genomix.job;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
+import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
+import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
+import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenBrujinGraph extends JobGen {
+	public enum GroupbyType {
+		EXTERNAL, PRECLUSTER, HYBRIDHASH,
+	}
+
+	public enum OutputFormat {
+		TEXT, BINARY,
+	}
+
+	JobConf job;
+	private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+	private Scheduler scheduler;
+	private String[] ncNodeNames;
+
+	private int kmers;
+	private int frameLimits;
+	private int frameSize;
+	private int tableSize;
+	private GroupbyType groupbyType;
+	private OutputFormat outputFormat;
+
+	private AbstractOperatorDescriptor singleGrouper;
+	private IConnectorDescriptor connPartition;
+	private AbstractOperatorDescriptor crossGrouper;
+	private RecordDescriptor readOutputRec;
+	private RecordDescriptor combineOutputRec;
+
+	/** works for hybrid hashing */
+	private long inputSizeInRawRecords;
+	private long inputSizeInUniqueKeys;
+	private int recordSizeInBytes;
+	private int hashfuncStartLevel;
+
+	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
+			final Map<String, NodeControllerInfo> ncMap,
+			int numPartitionPerMachine) {
+		super(job);
+		this.scheduler = scheduler;
+		String[] nodes = new String[ncMap.size()];
+		ncMap.keySet().toArray(nodes);
+		ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+		for (int i = 0; i < numPartitionPerMachine; i++) {
+			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
+					nodes.length);
+		}
+		LOG.info("nc nodes:" + ncNodeNames.length + " " + ncNodeNames.toString());
+	}
+
+	private ExternalGroupOperatorDescriptor newExternalGroupby(
+			JobSpecification jobSpec, int[] keyFields,
+			IAggregatorDescriptorFactory aggeragater) {
+		return new ExternalGroupOperatorDescriptor(
+				jobSpec,
+				keyFields,
+				frameLimits,
+				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+						.of(VLongPointable.FACTORY) },
+				new VLongNormalizedKeyComputerFactory(),
+				aggeragater,
+				new DistributedMergeLmerAggregateFactory(),
+				combineOutputRec,
+				new HashSpillableTableFactory(
+						new FieldHashPartitionComputerFactory(
+								keyFields,
+								new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+										.of(VLongPointable.FACTORY) }),
+						tableSize), true);
+	}
+
+	private HybridHashGroupOperatorDescriptor newHybridGroupby(
+			JobSpecification jobSpec, int[] keyFields,
+			long inputSizeInRawRecords, long inputSizeInUniqueKeys,
+			int recordSizeInBytes, int hashfuncStartLevel)
+			throws HyracksDataException {
+		return new HybridHashGroupOperatorDescriptor(
+				jobSpec,
+				keyFields,
+				frameLimits,
+				inputSizeInRawRecords,
+				inputSizeInUniqueKeys,
+				recordSizeInBytes,
+				tableSize,
+				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+						.of(VLongPointable.FACTORY) },
+				new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
+				hashfuncStartLevel, new VLongNormalizedKeyComputerFactory(),
+				new MergeKmerAggregateFactory(),
+				new DistributedMergeLmerAggregateFactory(), combineOutputRec,
+				true);
+	}
+
+	private void generateDescriptorbyType(JobSpecification jobSpec)
+			throws HyracksDataException {
+		int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+		switch (groupbyType) {
+		case EXTERNAL:
+			singleGrouper = newExternalGroupby(jobSpec, keyFields,
+					new MergeKmerAggregateFactory());
+			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+					new KmerHashPartitioncomputerFactory());
+			crossGrouper = newExternalGroupby(jobSpec, keyFields,
+					new DistributedMergeLmerAggregateFactory());
+			break;
+		case PRECLUSTER:
+			singleGrouper = newExternalGroupby(jobSpec, keyFields,
+					new MergeKmerAggregateFactory());
+			connPartition = new MToNPartitioningMergingConnectorDescriptor(
+					jobSpec,
+					new KmerHashPartitioncomputerFactory(),
+					keyFields,
+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+							.of(VLongPointable.FACTORY) });
+			crossGrouper = new PreclusteredGroupOperatorDescriptor(
+					jobSpec,
+					keyFields,
+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+							.of(VLongPointable.FACTORY) },
+					new DistributedMergeLmerAggregateFactory(),
+					combineOutputRec);
+			break;
+		case HYBRIDHASH:
+		default:
+
+			singleGrouper = newHybridGroupby(jobSpec, keyFields,
+					inputSizeInRawRecords, inputSizeInUniqueKeys,
+					recordSizeInBytes, hashfuncStartLevel);
+			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
+					new KmerHashPartitioncomputerFactory());
+
+			crossGrouper = newHybridGroupby(jobSpec, keyFields,
+					inputSizeInRawRecords, inputSizeInUniqueKeys,
+					recordSizeInBytes, hashfuncStartLevel);
+			break;
+		}
+	}
+
+	public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
+			throws HyracksDataException {
+		try {
+
+			InputSplit[] splits = job.getInputFormat().getSplits(job,
+					ncNodeNames.length);
+
+			LOG.info("HDFS read into " + splits.length + " splits");
+			String[] readSchedule = scheduler.getLocationConstraints(splits);
+			return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job,
+					splits, readSchedule, new ReadsKeyValueParserFactory(kmers));
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
+	@Override
+	public JobSpecification generateJob() throws HyracksException {
+
+		JobSpecification jobSpec = new JobSpecification();
+		readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				null, ByteSerializerDeserializer.INSTANCE });
+		combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+				null, ByteSerializerDeserializer.INSTANCE,
+				ByteSerializerDeserializer.INSTANCE });
+		jobSpec.setFrameSize(frameSize);
+
+		// File input
+		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				readOperator, ncNodeNames);
+
+		generateDescriptorbyType(jobSpec);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				singleGrouper, ncNodeNames);
+
+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+				jobSpec);
+		jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				crossGrouper, ncNodeNames);
+		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+
+		// Output
+		ITupleWriterFactory writer = null;
+		switch (outputFormat) {
+		case TEXT:
+			writer = new KMerTextWriterFactory(kmers);
+			break;
+		case BINARY:
+		default:
+			writer = new KMerSequenceWriterFactory(job);
+			break;
+		}
+		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
+				jobSpec, job, writer);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				writeOperator, ncNodeNames);
+
+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
+				jobSpec);
+		jobSpec.connect(printConn, crossGrouper, 0, writeOperator, 0);
+		jobSpec.addRoot(writeOperator);
+
+		if (groupbyType == GroupbyType.PRECLUSTER) {
+			jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+		}
+		return jobSpec;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+
+		kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+		frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT,
+				GenomixJob.DEFAULT_FRAME_LIMIT);
+		tableSize = conf.getInt(GenomixJob.TABLE_SIZE,
+				GenomixJob.DEFAULT_TABLE_SIZE);
+		frameSize = conf.getInt(GenomixJob.FRAME_SIZE,
+				GenomixJob.DEFAULT_FRAME_SIZE);
+		inputSizeInRawRecords = conf.getLong(
+				GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+		inputSizeInUniqueKeys = conf.getLong(
+				GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+		recordSizeInBytes = conf.getInt(
+				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+		hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+		/** here read the different recordSize why ? */
+		recordSizeInBytes = conf.getInt(
+				GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+				GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+
+		String type = conf.get(GenomixJob.GROUPBY_TYPE,
+				GenomixJob.DEFAULT_GROUPBY_TYPE);
+		if (type.equalsIgnoreCase("external")) {
+			groupbyType = GroupbyType.EXTERNAL;
+		} else if (type.equalsIgnoreCase("precluster")) {
+			groupbyType = GroupbyType.PRECLUSTER;
+		} else {
+			groupbyType = GroupbyType.HYBRIDHASH;
+		}
+
+		String output = conf.get(GenomixJob.OUTPUT_FORMAT,
+				GenomixJob.DEFAULT_OUTPUT_FORMAT);
+		if (output.equalsIgnoreCase("text")) {
+			outputFormat = OutputFormat.TEXT;
+		} else {
+			outputFormat = OutputFormat.BINARY;
+		}
+		job = new JobConf(conf);
+		LOG.info("Genomix Graph Build Configuration");
+		LOG.info("Kmer:" + kmers);
+		LOG.info("Groupby type:" + type);
+		LOG.info("Output format:" + output);
+		LOG.info("Frame limit" + frameLimits);
+		LOG.info("Frame size" + frameSize);
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
new file mode 100644
index 0000000..6d30fad
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenContigsGeneration extends JobGen {
+
+	public JobGenContigsGeneration(GenomixJob job) {
+		super(job);
+	}
+
+	@Override
+	public JobSpecification generateJob() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		// TODO Auto-generated method stub
+
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
new file mode 100644
index 0000000..43b5a97
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.genomix.job;
+
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobGenGraphCleanning extends JobGen {
+
+	public JobGenGraphCleanning(GenomixJob job) {
+		super(job);
+	}
+
+	@Override
+	public JobSpecification generateJob() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	protected void initJobConfiguration() {
+		// TODO Auto-generated method stub
+
+	}
+
+}
diff --git a/genomix/genomix-hyracks/src/main/resources/conf/cluster.properties b/genomix/genomix-hyracks/src/main/resources/conf/cluster.properties
new file mode 100644
index 0000000..eabd81b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/conf/cluster.properties
@@ -0,0 +1,40 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME="../../../../hyracks"
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#HADOOP_HOME 
+CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx10g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/genomix/genomix-hyracks/src/main/resources/conf/debugnc.properties b/genomix/genomix-hyracks/src/main/resources/conf/debugnc.properties
new file mode 100644
index 0000000..27afa26
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/conf/debugnc.properties
@@ -0,0 +1,12 @@
+#The tmp directory for nc to install jars
+NCTMP_DIR2=/tmp/t-1
+
+#The directory to put nc logs
+NCLOGS_DIR2=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS2="/tmp/t-2,/tmp/t-3"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS2="-Xdebug -Xrunjdwp:transport=dt_socket,address=7003,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/genomix/genomix-hyracks/src/main/resources/conf/master b/genomix/genomix-hyracks/src/main/resources/conf/master
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/genomix/genomix-hyracks/src/main/resources/conf/slaves b/genomix/genomix-hyracks/src/main/resources/conf/slaves
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/genomix b/genomix/genomix-hyracks/src/main/resources/scripts/genomix
new file mode 100644
index 0000000..bdd7f20
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/genomix
@@ -0,0 +1,113 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+#  Copyright 2001-2006 The Apache Software Foundation.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+# ----------------------------------------------------------------------------
+#
+#   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights
+#   reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+  ls=`ls -ld "$PRG"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+  CYGWIN*) cygwin=true ;;
+  Darwin*) darwin=true
+           if [ -z "$JAVA_VERSION" ] ; then
+             JAVA_VERSION="CurrentJDK"
+           else
+             echo "Using Java version: $JAVA_VERSION"
+           fi
+           if [ -z "$JAVA_HOME" ] ; then
+             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+           fi
+           ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=`java-config --jre-home`
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD=`which java`
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." 1>&2
+  echo "  We cannot execute $JAVACMD" 1>&2
+  exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+  REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$BASEDIR"/etc:"$REPO"/hyracks-dataflow-std-0.2.3-SNAPSHOT.jar:"$REPO"/hyracks-api-0.2.3-SNAPSHOT.jar:"$REPO"/json-20090211.jar:"$REPO"/httpclient-4.1-alpha2.jar:"$REPO"/httpcore-4.1-beta1.jar:"$REPO"/commons-logging-1.1.1.jar:"$REPO"/commons-codec-1.4.jar:"$REPO"/args4j-2.0.12.jar:"$REPO"/commons-lang3-3.1.jar:"$REPO"/hyracks-dataflow-common-0.2.3-SNAPSHOT.jar:"$REPO"/hyracks-data-std-0.2.3-SNAPSHOT.jar:"$REPO"/hyracks-control-cc-0.2.3-SNAPSHOT.jar:"$REPO"/hyracks-control-common-0.2.3-SNAPSHOT.jar:"$REPO"/jetty-server-8.0.0.RC0.jar:"$REPO"/servlet-api-3.0.20100224.jar:"$REPO"/jetty-continuation-8.0.0.RC0.jar:"$REPO"/jetty-http-8.0.0.RC0.jar:"$REPO"/jetty-io-8.0.0.RC0.jar:"$REPO"/jetty-webapp-8.0.0.RC0.jar:"$REPO"/jetty-xml-8.0.0.RC0.jar:"$REPO"/jetty-util-8.0.0.RC0.jar:"$REPO"/jetty-servlet-8.0.0.RC0.jar:"$REPO"/jetty-security-8.0.0.RC0.jar:"$REPO"/wicket-core-1.5.2.jar:"$REPO"/wicket-util-1.5.2.jar:"$REPO"/wicket-request-1.5.2.jar:"$REPO"/slf4j-api-1.6.1.jar:"$REPO"/slf4j-jcl-1.6.3.jar:"$REPO"/hyracks-control-nc-0.2.3-SNAPSHOT.jar:"$REPO"/dcache-client-0.0.1.jar:"$REPO"/jetty-client-8.0.0.M0.jar:"$REPO"/hyracks-net-0.2.3-SNAPSHOT.jar:"$REPO"/commons-io-1.3.1.jar:"$REPO"/hyracks-ipc-0.2.3-SNAPSHOT.jar:"$REPO"/genomix-0.2.3-SNAPSHOT.pom
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS  \
+  -classpath "$CLASSPATH" \
+  -Dapp.name="genomix" \
+  -Dapp.pid="$$" \
+  -Dapp.repo="$REPO" \
+  -Dapp.home="$BASEDIR" \
+  -Dbasedir="$BASEDIR" \
+  edu.uci.ics.genomix.driver.Driver \
+  "$@"
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/genomix.bat b/genomix/genomix-hyracks/src/main/resources/scripts/genomix.bat
new file mode 100644
index 0000000..1bd2098
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/genomix.bat
@@ -0,0 +1,108 @@
+@REM ----------------------------------------------------------------------------

+@REM  Copyright 2001-2006 The Apache Software Foundation.

+@REM

+@REM  Licensed under the Apache License, Version 2.0 (the "License");

+@REM  you may not use this file except in compliance with the License.

+@REM  You may obtain a copy of the License at

+@REM

+@REM       http://www.apache.org/licenses/LICENSE-2.0

+@REM

+@REM  Unless required by applicable law or agreed to in writing, software

+@REM  distributed under the License is distributed on an "AS IS" BASIS,

+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+@REM  See the License for the specific language governing permissions and

+@REM  limitations under the License.

+@REM ----------------------------------------------------------------------------

+@REM

+@REM   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights

+@REM   reserved.

+

+@echo off

+

+set ERROR_CODE=0

+

+:init

+@REM Decide how to startup depending on the version of windows

+

+@REM -- Win98ME

+if NOT "%OS%"=="Windows_NT" goto Win9xArg

+

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" @setlocal

+

+@REM -- 4NT shell

+if "%eval[2+2]" == "4" goto 4NTArgs

+

+@REM -- Regular WinNT shell

+set CMD_LINE_ARGS=%*

+goto WinNTGetScriptDir

+

+@REM The 4NT Shell from jp software

+:4NTArgs

+set CMD_LINE_ARGS=%$

+goto WinNTGetScriptDir

+

+:Win9xArg

+@REM Slurp the command line arguments.  This loop allows for an unlimited number

+@REM of arguments (up to the command line limit, anyway).

+set CMD_LINE_ARGS=

+:Win9xApp

+if %1a==a goto Win9xGetScriptDir

+set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1

+shift

+goto Win9xApp

+

+:Win9xGetScriptDir

+set SAVEDIR=%CD%

+%0\

+cd %0\..\.. 

+set BASEDIR=%CD%

+cd %SAVEDIR%

+set SAVE_DIR=

+goto repoSetup

+

+:WinNTGetScriptDir

+set BASEDIR=%~dp0\..

+

+:repoSetup

+

+

+if "%JAVACMD%"=="" set JAVACMD=java

+

+if "%REPO%"=="" set REPO=%BASEDIR%\lib

+

+set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\hyracks-dataflow-std-0.2.3-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.3-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.4.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-dataflow-common-0.2.3-SNAPSHOT.jar;"%REPO%"\hyracks-data-std-0.2.3-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.3-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.3-SNAPSHOT.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.3-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\hyracks-net-0.2.3-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\hyracks-ipc-0.2.3-SNAPSHOT.jar;"%REPO%"\genomix-0.2.3-SNAPSHOT.pom

+goto endInit

+

+@REM Reaching here means variables are defined and arguments have been captured

+:endInit

+

+%JAVACMD% %JAVA_OPTS%  -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="genomix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" edu.uci.ics.genomix.driver.Driver %CMD_LINE_ARGS%

+if ERRORLEVEL 1 goto error

+goto end

+

+:error

+if "%OS%"=="Windows_NT" @endlocal

+set ERROR_CODE=%ERRORLEVEL%

+

+:end

+@REM set local scope for the variables with windows NT shell

+if "%OS%"=="Windows_NT" goto endNT

+

+@REM For old DOS remove the set variables from ENV - we assume they were not set

+@REM before we started - at least we don't leave any baggage around

+set CMD_LINE_ARGS=

+goto postExec

+

+:endNT

+@REM If error code is set to 1 then the endlocal was done already in :error.

+if %ERROR_CODE% EQU 0 @endlocal

+

+

+:postExec

+

+if "%FORCE_EXIT_ON_ERROR%" == "on" (

+  if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%

+)

+

+exit /B %ERROR_CODE%

diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/getip.sh b/genomix/genomix-hyracks/src/main/resources/scripts/getip.sh
new file mode 100644
index 0000000..e0cdf73
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/getip.sh
@@ -0,0 +1,21 @@
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+        #Get IP Address
+        IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+	if [ "$IPADDR" = "" ]
+        then
+		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi 
+else
+        IPADDR=`/sbin/ifconfig en1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+	if [ "$IPADDR" = "" ]
+        then
+                IPADDR=`/sbin/ifconfig lo0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi
+
+fi
+echo $IPADDR
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startAllNCs.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startAllNCs.sh
new file mode 100644
index 0000000..5e38c40
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startAllNCs.sh
@@ -0,0 +1,6 @@
+GENOMIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${GENOMIX_PATH}; bin/startnc.sh"
+done
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startCluster.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startCluster.sh
new file mode 100755
index 0000000..4727764
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startCluster.sh
@@ -0,0 +1,19 @@
+bin/startcc.sh
+sleep 5
+bin/startAllNCs.sh
+
+. conf/cluster.properties
+# do we need to specify the version somewhere?
+hyrackcmd=`ls ${HYRACKS_HOME}/hyracks-cli/target/hyracks-cli-*-binary-assembly/bin/hyrackscli`
+# find zip file
+appzip=`ls $PWD/../genomix-*-binary-assembly.zip`
+
+[ -f $hyrackcmd ] || { echo "Hyracks commandline is missing"; exit -1;}
+[ -f $appzip ] || { echo "Genomix binary-assembly.zip is missing"; exit -1;}
+
+CCHOST_NAME=`cat conf/master`
+
+IPADDR=`bin/getip.sh`
+echo "connect to \"${IPADDR}:${CC_CLIENTPORT}\"; create application genomix \"$appzip\";" | $hyrackcmd 
+echo ""
+
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
new file mode 100644
index 0000000..c335475
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startDebugNc.sh
@@ -0,0 +1,50 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+. conf/debugnc.properties
+
+#Clean up temp dir
+
+#rm -rf $NCTMP_DIR2
+mkdir $NCTMP_DIR2
+
+#Clean up log dir
+#rm -rf $NCLOGS_DIR2
+mkdir $NCLOGS_DIR2
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS2 | tr "," "\n")
+for io_dir in $io_dirs
+do
+	#rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get OS
+IPADDR=`bin/getip.sh`
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+NODEID=${NODEID}2
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS2
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR2
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startcc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startcc.sh
new file mode 100644
index 0000000..93967e7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startcc.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+hostname
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`bin/getip.sh`
+
+#Remove the temp dir
+#rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+#rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/startnc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/startnc.sh
new file mode 100644
index 0000000..ceaa7bc
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/startnc.sh
@@ -0,0 +1,49 @@
+hostname
+
+MY_NAME=`hostname`
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+#rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+#rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	#rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+IPADDR=`bin/getip.sh`
+#echo $IPADDR
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/stopAllNCs.sh b/genomix/genomix-hyracks/src/main/resources/scripts/stopAllNCs.sh
new file mode 100644
index 0000000..66ed866
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/stopAllNCs.sh
@@ -0,0 +1,6 @@
+GENOMIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${GENOMIX_PATH}; bin/stopnc.sh"
+done
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/stopCluster.sh b/genomix/genomix-hyracks/src/main/resources/scripts/stopCluster.sh
new file mode 100644
index 0000000..4889934
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/stopCluster.sh
@@ -0,0 +1,3 @@
+bin/stopAllNCs.sh
+sleep 2
+bin/stopcc.sh
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/stopcc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/stopcc.sh
new file mode 100644
index 0000000..1865054
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/stopcc.sh
@@ -0,0 +1,10 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+echo $PID
+[ "$PID" != "" ] && kill -9 $PID
+
+#Clean up CC temp dir
+rm -rf $CCTMP_DIR/*
diff --git a/genomix/genomix-hyracks/src/main/resources/scripts/stopnc.sh b/genomix/genomix-hyracks/src/main/resources/scripts/stopnc.sh
new file mode 100644
index 0000000..3928bb7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/resources/scripts/stopnc.sh
@@ -0,0 +1,23 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+  USERID=`id | sed 's/^uid=//;s/(.*$//'`
+  PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+fi
+
+echo $PID
+[ "$PID" != "" ] && kill -9 $PID
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir/*
+done
+
+#Clean up NC temp dir
+rm -rf $NCTMP_DIR/*