used hdfsdescriptor from hyracks-hdfs

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2872 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index bb868ce..a0a6b9a 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -4,44 +4,46 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 

-public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {

-    private static final long serialVersionUID = 8735044913496854551L;

+public class Integer64NormalizedKeyComputerFactory implements

+		INormalizedKeyComputerFactory {

+	private static final long serialVersionUID = 8735044913496854551L;

 

-    @Override

-    public INormalizedKeyComputer createNormalizedKeyComputer() {

-        return new INormalizedKeyComputer() {

-            private static final int POSTIVE_LONG_MASK = (3 << 30);

-            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

-            private static final int NEGATIVE_LONG_MASK = (0 << 30);

+	@Override

+	public INormalizedKeyComputer createNormalizedKeyComputer() {

+		return new INormalizedKeyComputer() {

+			private static final int POSTIVE_LONG_MASK = (3 << 30);

+			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

+			private static final int NEGATIVE_LONG_MASK = (0 << 30);

 

-            @Override

-            public int normalize(byte[] bytes, int start, int length) {

-                long value = Integer64SerializerDeserializer.getLong(bytes, start);

-                int highValue = (int) (value >> 32);

-                if (highValue > 0) {

-                    /** * larger than Integer.MAX */

-                    int highNmk = getKey(highValue);

-                    highNmk >>= 2;

-                    highNmk |= POSTIVE_LONG_MASK;

-                    return highNmk;

-                } else if (highValue == 0) {

-                    /** * smaller than Integer.MAX but >=0 */

-                    int lowNmk = (int) value;

-                    lowNmk >>= 2;

-                    lowNmk |= NON_NEGATIVE_INT_MASK;

-                    return lowNmk;

-                } else {

-                    /** * less than 0: have not optimized for that */

-                    int highNmk = getKey(highValue);

-                    highNmk >>= 2;

-                    highNmk |= NEGATIVE_LONG_MASK;

-                    return highNmk;

-                }

-            }

+			@Override

+			public int normalize(byte[] bytes, int start, int length) {

+				long value = Integer64SerializerDeserializer.getLong(bytes,

+						start);

+				int highValue = (int) (value >> 32);

+				if (highValue > 0) {

+					/** * larger than Integer.MAX */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= POSTIVE_LONG_MASK;

+					return highNmk;

+				} else if (highValue == 0) {

+					/** * smaller than Integer.MAX but >=0 */

+					int lowNmk = (int) value;

+					lowNmk >>= 2;

+					lowNmk |= NON_NEGATIVE_INT_MASK;

+					return lowNmk;

+				} else {

+					/** * less than 0: have not optimized for that */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= NEGATIVE_LONG_MASK;

+					return highNmk;

+				}

+			}

 

-            private int getKey(int value) {

-                return value ^ Integer.MIN_VALUE;

-            }

-        };

-    }

+			private int getKey(int value) {

+				return value ^ Integer.MIN_VALUE;

+			}

+		};

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index f3a3e72..ac8a6bc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -3,60 +3,59 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;

 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 

-public class VLongNormalizedKeyComputerFactory  implements INormalizedKeyComputerFactory {

-    private static final long serialVersionUID = 8735044913496854551L;

+public class VLongNormalizedKeyComputerFactory implements

+		INormalizedKeyComputerFactory {

+	private static final long serialVersionUID = 8735044913496854551L;

 

-    @Override

-    public INormalizedKeyComputer createNormalizedKeyComputer() {

-        return new INormalizedKeyComputer() {

-            private static final int POSTIVE_LONG_MASK = (3 << 30);

-            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

-            private static final int NEGATIVE_LONG_MASK = (0 << 30);

+	@Override

+	public INormalizedKeyComputer createNormalizedKeyComputer() {

+		return new INormalizedKeyComputer() {

+			private static final int POSTIVE_LONG_MASK = (3 << 30);

+			private static final int NON_NEGATIVE_INT_MASK = (2 << 30);

+			private static final int NEGATIVE_LONG_MASK = (0 << 30);

 

-            

-            private long getLong(byte[] bytes, int offset) {

-            	int l = (int)Math.ceil((double)bytes[offset]/4.0);

-            	int n = (l<8)?l:8;

-            	

-            	long r = 0;

-            	for(int i = 0 ; i < n ; i++){

-            		r <<= 8;

-            		r += (long) (bytes[offset + 1] & 0xff);

-            	}

-            	

-            	return r;

-            }

-            	

-            

-            @Override

-            public int normalize(byte[] bytes, int start, int length) {

-                long value = getLong(bytes, start);

-                

-                int highValue = (int) (value >> 32);

-                if (highValue > 0) {

-                    /** * larger than Integer.MAX */

-                    int highNmk = getKey(highValue);

-                    highNmk >>= 2;

-                    highNmk |= POSTIVE_LONG_MASK;

-                    return highNmk;

-                } else if (highValue == 0) {

-                    /** * smaller than Integer.MAX but >=0 */

-                    int lowNmk = (int) value;

-                    lowNmk >>= 2;

-                    lowNmk |= NON_NEGATIVE_INT_MASK;

-                    return lowNmk;

-                } else {

-                    /** * less than 0: have not optimized for that */

-                    int highNmk = getKey(highValue);

-                    highNmk >>= 2;

-                    highNmk |= NEGATIVE_LONG_MASK;

-                    return highNmk;

-                }

-            }

+			private long getLong(byte[] bytes, int offset) {

+				int l = (int) Math.ceil((double) bytes[offset] / 4.0);

+				int n = (l < 8) ? l : 8;

 

-            private int getKey(int value) {

-                return value ^ Integer.MIN_VALUE;

-            }

-        };

-    }

+				long r = 0;

+				for (int i = 0; i < n; i++) {

+					r <<= 8;

+					r += (long) (bytes[offset + 1] & 0xff);

+				}

+

+				return r;

+			}

+

+			@Override

+			public int normalize(byte[] bytes, int start, int length) {

+				long value = getLong(bytes, start);

+

+				int highValue = (int) (value >> 32);

+				if (highValue > 0) {

+					/** * larger than Integer.MAX */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= POSTIVE_LONG_MASK;

+					return highNmk;

+				} else if (highValue == 0) {

+					/** * smaller than Integer.MAX but >=0 */

+					int lowNmk = (int) value;

+					lowNmk >>= 2;

+					lowNmk |= NON_NEGATIVE_INT_MASK;

+					return lowNmk;

+				} else {

+					/** * less than 0: have not optimized for that */

+					int highNmk = getKey(highValue);

+					highNmk >>= 2;

+					highNmk |= NEGATIVE_LONG_MASK;

+					return highNmk;

+				}

+			}

+

+			private int getKey(int value) {

+				return value ^ Integer.MIN_VALUE;

+			}

+		};

+	}

 }
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 90bd12f..4078718 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -6,30 +6,32 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

 

-public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {

+public class KmerHashPartitioncomputerFactory implements

+		ITuplePartitionComputerFactory {

 

-    private static final long serialVersionUID = 1L;

+	private static final long serialVersionUID = 1L;

 

-    public KmerHashPartitioncomputerFactory() {

-    }

+	public KmerHashPartitioncomputerFactory() {

+	}

 

-    @Override

-    public ITuplePartitionComputer createPartitioner() {

-        return new ITuplePartitionComputer() {

-            @Override

-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {

-                if (nParts == 1) {

-                    return 0;

-                }

-                int startOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

-                int slotLength = accessor.getFieldSlotsLength();

+	@Override

+	public ITuplePartitionComputer createPartitioner() {

+		return new ITuplePartitionComputer() {

+			@Override

+			public int partition(IFrameTupleAccessor accessor, int tIndex,

+					int nParts) {

+				if (nParts == 1) {

+					return 0;

+				}

+				int startOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);

+				int slotLength = accessor.getFieldSlotsLength();

 

-                ByteBuffer buf = accessor.getBuffer();

-                buf.position(startOffset + fieldOffset + slotLength);

-                long l = accessor.getBuffer().getLong();

-                return (int) (l % nParts);

-            }

-        };

-    }

+				ByteBuffer buf = accessor.getBuffer();

+				buf.position(startOffset + fieldOffset + slotLength);

+				long l = accessor.getBuffer().getLong();

+				return (int) (l % nParts);

+			}

+		};

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
index 41dcb07..d88c0a0 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
@@ -7,39 +7,41 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+public class ByteSerializerDeserializer implements
+		ISerializerDeserializer<Byte> {
 
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+	public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
 
-    private ByteSerializerDeserializer() {
-    }
+	private ByteSerializerDeserializer() {
+	}
 
-    @Override
-    public Byte deserialize(DataInput in) throws HyracksDataException {
-        try {
-            return in.readByte();
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
+	@Override
+	public Byte deserialize(DataInput in) throws HyracksDataException {
+		try {
+			return in.readByte();
+		} catch (IOException e) {
+			throw new HyracksDataException(e);
+		}
+	}
 
-    @Override
-    public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
-        try {
-            out.writeByte(instance.intValue());
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
+	@Override
+	public void serialize(Byte instance, DataOutput out)
+			throws HyracksDataException {
+		try {
+			out.writeByte(instance.intValue());
+		} catch (IOException e) {
+			throw new HyracksDataException(e);
+		}
+	}
 
-    public static byte getByte(byte[] bytes, int offset) {
-        return bytes[offset];
-    }
+	public static byte getByte(byte[] bytes, int offset) {
+		return bytes[offset];
+	}
 
-    public static void putByte(byte val, byte[] bytes, int offset) {
-        bytes[offset] = val;
-    }
+	public static void putByte(byte val, byte[] bytes, int offset) {
+		bytes[offset] = val;
+	}
 
 }
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
index 9685dd1..6477e14 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
@@ -5,8 +5,7 @@
 import edu.uci.ics.hyracks.data.std.api.IHashable;

 import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

 

-public class LongBinaryHashFunctionFamily implements

-		IBinaryHashFunctionFamily {

+public class LongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

 	private static final long serialVersionUID = 1L;

 

 	@Override

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
index 9092517..661559f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -1,4 +1,3 @@
-

 package edu.uci.ics.genomix.data.std.accessors;

 

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

@@ -6,37 +5,36 @@
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;

 

 public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {

-    public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();

+	public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();

 

-    private static final long serialVersionUID = 1L;

+	private static final long serialVersionUID = 1L;

 

-    static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877, 3, 29 };

+	static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877,

+			3, 29 };

 

-    private LongHashFunctionFamily() {

-    }

+	private LongHashFunctionFamily() {

+	}

 

-    @Override

-    public IBinaryHashFunction createBinaryHashFunction(int seed) {

-        final int coefficient = primeCoefficents[seed % primeCoefficents.length];

-        final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(int seed) {

+		final int coefficient = primeCoefficents[seed % primeCoefficents.length];

+		final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];

 

-        return new IBinaryHashFunction() {

-            @Override

-            public int hash(byte[] bytes, int offset, int length) {

-                int h = 0;

-                int utflen = UTF8StringPointable.getUTFLength(bytes, offset);

-                int sStart = offset + 2;

-                int c = 0;

+		return new IBinaryHashFunction() {

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				int h = 0;

+				int utflen = UTF8StringPointable.getUTFLength(bytes, offset);

+				int sStart = offset + 2;

+				int c = 0;

 

-                while (c < utflen) {

-                    char ch = UTF8StringPointable.charAt(bytes, sStart + c);

-                    h = (coefficient * h + ch) % r;

-                    c += UTF8StringPointable.charSize(bytes, sStart + c);

-                }

-                return h;

-            }

-        };

-    }

+				while (c < utflen) {

+					char ch = UTF8StringPointable.charAt(bytes, sStart + c);

+					h = (coefficient * h + ch) % r;

+					c += UTF8StringPointable.charSize(bytes, sStart + c);

+				}

+				return h;

+			}

+		};

+	}

 }

-

-

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
index 361e036..b1db6f2 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -3,72 +3,75 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

 

-public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

-    

-    public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();

-    

-    private static final long serialVersionUID = 1L;

-    

-    private MurmurHash3BinaryHashFunctionFamily(){}

+public class MurmurHash3BinaryHashFunctionFamily implements

+		IBinaryHashFunctionFamily {

 

-    private static final int C1 = 0xcc9e2d51;

-    private static final int C2 = 0x1b873593;

-    private static final int C3 = 5;

-    private static final int C4 = 0xe6546b64;

-    private static final int C5 = 0x85ebca6b;

-    private static final int C6 = 0xc2b2ae35;

+	public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();

 

-    @Override

-    public IBinaryHashFunction createBinaryHashFunction(final int seed) {

-        return new IBinaryHashFunction() {

-            @Override

-            public int hash(byte[] bytes, int offset, int length) {

-                int h = seed;

-                int p = offset;

-                int remain = length;

-                while (remain > 4) {

-                    int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)

-                            | (((int) bytes[p + 3]) << 24);

-                    k *= C1;

-                    k = Integer.rotateLeft(k, 15);

-                    k *= C2;

-                    h ^= k;

-                    h = Integer.rotateLeft(h, 13);

-                    h = h * C3 + C4;

-                    p += 4;

-                    remain -= 4;

-                }

-                int k = 0;

-                for(int i = 0; remain > 0; i += 8){

-                    k ^= bytes[p++] << i;

-                    remain--;

-                }

-                k *= C1;

-                k = Integer.rotateLeft(k, 15);

-                k *= C2;

-                h ^= k;

-                //                switch (remain) {

-                //                    case 3:

-                //                        k = bytes[p++];

-                //                    case 2:

-                //                        k = (k << 8) | bytes[p++];

-                //                    case 1:

-                //                        k = (k << 8) | bytes[p++];

-                //                        k *= C1;

-                //                        k = Integer.rotateLeft(k, 15);

-                //                        k *= C2;

-                //                        h ^= k;

-                //                        h = Integer.rotateLeft(h, 13);

-                //                        h = h * C3 + C4;

-                //                }

-                h ^= length;

-                h ^= (h >>> 16);

-                h *= C5;

-                h ^= (h >>> 13);

-                h *= C6;

-                h ^= (h >>> 16);

-                return h;

-            }

-        };

-    }

+	private static final long serialVersionUID = 1L;

+

+	private MurmurHash3BinaryHashFunctionFamily() {

+	}

+

+	private static final int C1 = 0xcc9e2d51;

+	private static final int C2 = 0x1b873593;

+	private static final int C3 = 5;

+	private static final int C4 = 0xe6546b64;

+	private static final int C5 = 0x85ebca6b;

+	private static final int C6 = 0xc2b2ae35;

+

+	@Override

+	public IBinaryHashFunction createBinaryHashFunction(final int seed) {

+		return new IBinaryHashFunction() {

+			@Override

+			public int hash(byte[] bytes, int offset, int length) {

+				int h = seed;

+				int p = offset;

+				int remain = length;

+				while (remain > 4) {

+					int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8)

+							| (((int) bytes[p + 2]) << 16)

+							| (((int) bytes[p + 3]) << 24);

+					k *= C1;

+					k = Integer.rotateLeft(k, 15);

+					k *= C2;

+					h ^= k;

+					h = Integer.rotateLeft(h, 13);

+					h = h * C3 + C4;

+					p += 4;

+					remain -= 4;

+				}

+				int k = 0;

+				for (int i = 0; remain > 0; i += 8) {

+					k ^= bytes[p++] << i;

+					remain--;

+				}

+				k *= C1;

+				k = Integer.rotateLeft(k, 15);

+				k *= C2;

+				h ^= k;

+				// switch (remain) {

+				// case 3:

+				// k = bytes[p++];

+				// case 2:

+				// k = (k << 8) | bytes[p++];

+				// case 1:

+				// k = (k << 8) | bytes[p++];

+				// k *= C1;

+				// k = Integer.rotateLeft(k, 15);

+				// k *= C2;

+				// h ^= k;

+				// h = Integer.rotateLeft(h, 13);

+				// h = h * C3 + C4;

+				// }

+				h ^= length;

+				h ^= (h >>> 16);

+				h *= C5;

+				h ^= (h >>> 13);

+				h *= C6;

+				h ^= (h >>> 16);

+				return h;

+			}

+		};

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
index 2cd66e9..b9a0443 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
@@ -5,8 +5,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

 import edu.uci.ics.hyracks.data.std.api.IHashable;

 

-public class VLongBinaryHashFunctionFamily implements

-		IBinaryHashFunctionFamily {

+public class VLongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

 	private static final long serialVersionUID = 1L;

 

 	@Override

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index 65978d7..77cfd6e 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -8,134 +8,130 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;

 import edu.uci.ics.hyracks.data.std.api.IPointableFactory;

 

-public final class VLongPointable extends AbstractPointable implements IHashable, IComparable, INumeric {

+public final class VLongPointable extends AbstractPointable implements

+		IHashable, IComparable, INumeric {

 	static private int max = 65535;

-    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

-        private static final long serialVersionUID = 1L;

+	public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {

+		private static final long serialVersionUID = 1L;

 

-        @Override

-        public boolean isFixedLength() {

-            return false;

-        }

+		@Override

+		public boolean isFixedLength() {

+			return false;

+		}

 

-        @Override

-        public int getFixedLength() {

-            return -1;

-        }

-    };

+		@Override

+		public int getFixedLength() {

+			return -1;

+		}

+	};

 

-    public static final IPointableFactory FACTORY = new IPointableFactory() {

-        private static final long serialVersionUID = 1L;

+	public static final IPointableFactory FACTORY = new IPointableFactory() {

+		private static final long serialVersionUID = 1L;

 

-        @Override

-        public IPointable createPointable() {

-            return new VLongPointable();

-        }

+		@Override

+		public IPointable createPointable() {

+			return new VLongPointable();

+		}

 

-        @Override

-        public ITypeTraits getTypeTraits() {

-            return TYPE_TRAITS;

-        }

-    };

+		@Override

+		public ITypeTraits getTypeTraits() {

+			return TYPE_TRAITS;

+		}

+	};

 

+	public static long getLong(byte[] bytes, int start) {

+		int l = (int) Math.ceil((double) bytes[start] / 4.0);

+		int n = (l < 8) ? l : 8;

 

+		long r = 0;

+		for (int i = 0; i < n; i++) {

+			r <<= 8;

+			r += (long) (bytes[start + 1] & 0xff);

+		}

 

-    public static long getLong(byte[] bytes, int start) {

-    	int l = (int)Math.ceil((double)bytes[start]/4.0);

-    	int n = (l<8)?l:8;

-    	

-    	long r = 0;

-    	for(int i = 0 ; i < n ; i++){

-    		r <<= 8;

-    		r += (long) (bytes[start + 1] & 0xff);

-    	}

-    	

-    	return r;

-    }

-    

-    public long getLong() {

-        return getLong(bytes, start);

-    }

-    

-    public byte[] postIncrement() {

-    	int i = start + 1;

-    	int l = (int)Math.ceil(bytes[start]/4);

-    	while(i <= start+l){

-    		bytes[i] += 1;

-    		if(bytes[i] != 0){

-    			break;

-    		}

-    	}

-        return bytes;

-    }

+		return r;

+	}

 

-    @Override

-    public int compareTo(IPointable pointer) {

-        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());

-    }

+	public long getLong() {

+		return getLong(bytes, start);

+	}

 

-    @Override

-    public int compareTo(byte[] bytes, int start, int length) {

-    	

-    	int be = this.start;

-    	int n = (this.bytes[be] < bytes[start])? this.bytes[be] : bytes[start];

-    	int l = (int)Math.ceil(n/4);

-    	for(int i = 0 ; i <= l ; i++){

-    		if(this.bytes[i]<bytes[i]){

-    			return -1;

-    		}

-    		else if(this.bytes[i]>bytes[i]){

-    			return 1;

-    		}

-    		

-    	}

-    	    	

-        return (this.bytes[be] < bytes[start])? -1 : ((this.bytes[be] > bytes[start])?1:0);

-    }

+	public byte[] postIncrement() {

+		int i = start + 1;

+		int l = (int) Math.ceil(bytes[start] / 4);

+		while (i <= start + l) {

+			bytes[i] += 1;

+			if (bytes[i] != 0) {

+				break;

+			}

+		}

+		return bytes;

+	}

 

-    @Override

-    public int hash() {//BKDRHash

-    	int seed = 131; // 31 131 1313 13131 131313 etc..

-    	int hash = 0;

-    	int l =  (int)Math.ceil((double)bytes[start]/4.0);

-    	for(int i = start + 1 ; i <= start + l ; i++)

-    	{

-    		hash = hash * seed + bytes[i];

-    	}

-    	return (hash & 0x7FFFFFFF);

-    }

+	@Override

+	public int compareTo(IPointable pointer) {

+		return compareTo(pointer.getByteArray(), pointer.getStartOffset(),

+				pointer.getLength());

+	}

 

-    @Override

-    public byte byteValue() {

-        return (byte) bytes[start+1];

-    }

+	@Override

+	public int compareTo(byte[] bytes, int start, int length) {

 

-    @Override

-    public short shortValue() {

-    	

-        return (short) ((bytes[start+2] & 0xff) << 8 + bytes[start+1] & 0xff);

-    }

+		int be = this.start;

+		int n = (this.bytes[be] < bytes[start]) ? this.bytes[be] : bytes[start];

+		int l = (int) Math.ceil(n / 4);

+		for (int i = 0; i <= l; i++) {

+			if (this.bytes[i] < bytes[i]) {

+				return -1;

+			} else if (this.bytes[i] > bytes[i]) {

+				return 1;

+			}

 

-    @Override

-    public int intValue() {

-        return (int) ( (bytes[start + 4] & 0xff) << 24 +  (bytes[start + 3] & 0xff) <<16 + 

-        		(bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

-    }

+		}

 

-    

-    

-    @Override

-    public long longValue() {

-        return getLong();

-    }

+		return (this.bytes[be] < bytes[start]) ? -1

+				: ((this.bytes[be] > bytes[start]) ? 1 : 0);

+	}

 

-    @Override

-    public float floatValue() {

-        return getLong();

-    }

+	@Override

+	public int hash() {// BKDRHash

+		int seed = 131; // 31 131 1313 13131 131313 etc..

+		int hash = 0;

+		int l = (int) Math.ceil((double) bytes[start] / 4.0);

+		for (int i = start + 1; i <= start + l; i++) {

+			hash = hash * seed + bytes[i];

+		}

+		return (hash & 0x7FFFFFFF);

+	}

 

-    @Override

-    public double doubleValue() {

-        return getLong();

-    }

+	@Override

+	public byte byteValue() {

+		return (byte) bytes[start + 1];

+	}

+

+	@Override

+	public short shortValue() {

+

+		return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public int intValue() {

+		return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);

+	}

+

+	@Override

+	public long longValue() {

+		return getLong();

+	}

+

+	@Override

+	public float floatValue() {

+		return getLong();

+	}

+

+	@Override

+	public double doubleValue() {

+		return getLong();

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index 78c37b7..20c6a28 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -7,18 +7,20 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;

 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

 

-public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {

-    private static final long serialVersionUID = 1L;

-    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

-    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

+public class ConnectorPolicyAssignmentPolicy implements

+		IConnectorPolicyAssignmentPolicy {

+	private static final long serialVersionUID = 1L;

+	private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();

+	private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();

 

-    @Override

-    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,

-            int[] fanouts) {

-        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

-            return senderSideMaterializePolicy;

-        } else {

-            return pipeliningPolicy;

-        }

-    }

+	@Override

+	public IConnectorPolicy getConnectorPolicyAssignment(

+			IConnectorDescriptor c, int nProducers, int nConsumers,

+			int[] fanouts) {

+		if (c instanceof MToNPartitioningMergingConnectorDescriptor) {

+			return senderSideMaterializePolicy;

+		} else {

+			return pipeliningPolicy;

+		}

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 3f0f194..ebc5346 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -22,263 +22,278 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;

 

-public class FileScanDescriptor extends AbstractSingleActivityOperatorDescriptor {

+public class FileScanDescriptor extends

+		AbstractSingleActivityOperatorDescriptor {

 

-    private static final long serialVersionUID = 1L;

-    private int k;

-    private Path [] filesplit = null ;

-    private String pathSurfix ;

-    private int byteNum;

+	private static final long serialVersionUID = 1L;

+	private int k;

+	private Path[] filesplit = null;

+	private String pathSurfix;

+	private int byteNum;

 

-    public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String path) {

-        super(spec, 0, 1);

-        this.k = k;

-        this.pathSurfix = path;

-        

-        byteNum = (byte)Math.ceil((double)k/4.0);

-        //recordDescriptors[0] = news RecordDescriptor(

-        //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

-        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

-                null, null});

-    }

+	public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k,

+			String path) {

+		super(spec, 0, 1);

+		this.k = k;

+		this.pathSurfix = path;

 

-    public FileScanDescriptor(JobSpecification jobSpec, int kmers,

-			Path[] inputPaths) {

-    	super(jobSpec, 0, 1);

-        this.k = k;

-        this.filesplit = inputPaths;

-        this.pathSurfix = inputPaths[0].toString();

-        //recordDescriptors[0] = news RecordDescriptor(

-        //		new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });

-        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

-                null, ByteSerializerDeserializer.INSTANCE });

+		byteNum = (byte) Math.ceil((double) k / 4.0);

+		// recordDescriptors[0] = news RecordDescriptor(

+		// new ISerializerDeserializer[] {

+		// UTF8StringSerializerDeserializer.INSTANCE });

+		recordDescriptors[0] = new RecordDescriptor(

+				new ISerializerDeserializer[] { null, null });

 	}

 

-	public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {

+	public FileScanDescriptor(JobSpecification jobSpec, int kmers,

+			Path[] inputPaths) {

+		super(jobSpec, 0, 1);

+		this.k = k;

+		this.filesplit = inputPaths;

+		this.pathSurfix = inputPaths[0].toString();

+		// recordDescriptors[0] = news RecordDescriptor(

+		// new ISerializerDeserializer[] {

+		// UTF8StringSerializerDeserializer.INSTANCE });

+		recordDescriptors[0] = new RecordDescriptor(

+				new ISerializerDeserializer[] { null,

+						ByteSerializerDeserializer.INSTANCE });

+	}

 

-        final int temp = partition;

+	public IOperatorNodePushable createPushRuntime(

+			final IHyracksTaskContext ctx,

+			IRecordDescriptorProvider recordDescProvider, int partition,

+			int nPartitions) {

 

-        // TODO Auto-generated method stub

-        return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {

-            private ArrayTupleBuilder tupleBuilder;

-            private ByteBuffer outputBuffer;

-            private FrameTupleAppender outputAppender;

+		final int temp = partition;

 

-            @SuppressWarnings("resource")

+		// TODO Auto-generated method stub

+		return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {

+			private ArrayTupleBuilder tupleBuilder;

+			private ByteBuffer outputBuffer;

+			private FrameTupleAppender outputAppender;

+

+			@SuppressWarnings("resource")

 			@Override

-            public void initialize() {

+			public void initialize() {

 

-                tupleBuilder = new ArrayTupleBuilder(2);

-                outputBuffer = ctx.allocateFrame();

-                outputAppender = new FrameTupleAppender(ctx.getFrameSize());

-                outputAppender.reset(outputBuffer, true);

-                try {// one try with multiple catch?

-                    writer.open();

-                    String s = pathSurfix + String.valueOf(temp);

-                    

-                    File tf = new File(s);

-                    

-                    File[] fa = tf.listFiles();

+				tupleBuilder = new ArrayTupleBuilder(2);

+				outputBuffer = ctx.allocateFrame();

+				outputAppender = new FrameTupleAppender(ctx.getFrameSize());

+				outputAppender.reset(outputBuffer, true);

+				try {// one try with multiple catch?

+					writer.open();

+					String s = pathSurfix + String.valueOf(temp);

 

-                    for(int i = 0 ; i < fa.length ; i++){

-                        BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(fa[i])));

-                        String read = readsfile.readLine();

-                        while (read != null) {

-                            read = readsfile.readLine();

-                            SplitReads(read.getBytes());

-                            //read.getBytes();

+					File tf = new File(s);

 

-                            read = readsfile.readLine();

-                            read = readsfile.readLine();

+					File[] fa = tf.listFiles();

 

-                            read = readsfile.readLine();

-                        }

-                    }

-                    if (outputAppender.getTupleCount() > 0) {

-                        FrameUtils.flushFrame(outputBuffer, writer);

-                    }

-                    outputAppender = null;

-                    outputBuffer = null;

-                    // sort code for external sort here?

-                    writer.close();

-                } catch (Exception e) {

-                    // TODO Auto-generated catch block

-                    throw new IllegalStateException(e);

-                }

-            }

+					for (int i = 0; i < fa.length; i++) {

+						BufferedReader readsfile = new BufferedReader(

+								new InputStreamReader(

+										new FileInputStream(fa[i])));

+						String read = readsfile.readLine();

+						while (read != null) {

+							read = readsfile.readLine();

+							SplitReads(read.getBytes());

+							// read.getBytes();

 

-            private byte[] CompressKmer(byte[] array, int start) {

-                // a: 00; c: 01; G: 10; T: 11

-            	

-            	byte[] bytes = new byte[byteNum+1];

-            	bytes[0] = (byte) k;

-            	

-            	byte l = 0;

-            	int count = 0;

-            	int bcount = 0;

-            	

-                for (int i = start; i < start + k; i++) {

-                    l <<= 2;

-                    switch (array[i]) {

-                        case 'A':

-                        case 'a':

-                            l |= 0;

-                            break;

-                        case 'C':

-                        case 'c':

-                            l |= 1;

-                            break;

-                        case 'G':

-                        case 'g':

-                            l |= 2;

-                            break;

-                        case 'T':

-                        case 't':

-                            l |= 3;

-                            break;

-                    }

-                    count += 2;

-                    if(count%8==0){

-                    	bcount += 1;

-                    	bytes[bcount] = l;

-                    	count = 0;

-                    }

-                }

-                bytes[bcount + 1] = l;

-                return bytes;

-            }

+							read = readsfile.readLine();

+							read = readsfile.readLine();

 

-            private byte GetBitmap(byte t) {

-                byte r = 0;

-                switch (t) {

-                    case 'A':

-                    case 'a':

-                        r = 1;

-                        break;

-                    case 'C':

-                    case 'c':

-                        r = 2;

-                        break;

-                    case 'G':

-                    case 'g':

-                        r = 4;

-                        break;

-                    case 'T':

-                    case 't':

-                        r = 8;

-                        break;

-                }

-                return r;

-            }

+							read = readsfile.readLine();

+						}

+					}

+					if (outputAppender.getTupleCount() > 0) {

+						FrameUtils.flushFrame(outputBuffer, writer);

+					}

+					outputAppender = null;

+					outputBuffer = null;

+					// sort code for external sort here?

+					writer.close();

+				} catch (Exception e) {

+					// TODO Auto-generated catch block

+					throw new IllegalStateException(e);

+				}

+			}

 

-            private byte ConvertSymbol(byte t) {

-                byte r = 0;

-                switch (t) {

-                    case 'A':

-                    case 'a':

-                        r = 0;

-                        break;

-                    case 'C':

-                    case 'c':

-                        r = 1;

-                        break;

-                    case 'G':

-                    case 'g':

-                        r = 2;

-                        break;

-                    case 'T':

-                    case 't':

-                        r = 3;

-                        break;

-                }

-                return r;

-            }

-            

-            void MoveKmer(byte[] bytes, byte c){            	

-            	byte filter0 = (byte) 0xC0;

-            	byte filter1 = (byte) 0xFC;

-            	byte filter2 = 0;

-            	

-            	int r = byteNum*8 - 2*k;

-            	r = 8 - r;

-            	for(int i = 0 ; i < r ; i++){

-            		filter2 <<= 1;

-            		filter2 |= 1;

-            	}

+			private byte[] CompressKmer(byte[] array, int start) {

+				// a: 00; c: 01; G: 10; T: 11

 

-                int i = byteNum;

-                bytes[i] <<= 2;

-                bytes[i] &= filter2;

-                i -= 1;

-            	while(i > 0){

-            		byte f = (byte) (bytes[i] & filter0);

-            		f >>= 6;

-                	bytes[i+1] |= f;

-            		bytes[i] <<= 2;

-            		bytes[i] &= filter1;

-            	}

-            	bytes[i+1] |= ConvertSymbol(c);

-            }

+				byte[] bytes = new byte[byteNum + 1];

+				bytes[0] = (byte) k;

 

-            private void SplitReads(byte[] array) {

-                try {

-                	byte[] bytes=null;

-                	

-                	byte pre = 0, next = 0;

-                    byte r;

+				byte l = 0;

+				int count = 0;

+				int bcount = 0;

 

-                    for (int i = 0; i < array.length - k + 1; i++) {

-                        if (0 == i) {

-                            bytes = CompressKmer(array, i);

-                        } else {

-                        	MoveKmer(bytes, array[i + k - 1]);

-                            /*l <<= 2;

-                            l &= window;

-                            l |= ConvertSymbol(array[i + k - 1]);*/

-                            pre = GetBitmap(array[i - 1]);

-                        }

-                        if (i + k != array.length) {

-                            next = GetBitmap(array[i + k]);

-                        }

+				for (int i = start; i < start + k; i++) {

+					l <<= 2;

+					switch (array[i]) {

+					case 'A':

+					case 'a':

+						l |= 0;

+						break;

+					case 'C':

+					case 'c':

+						l |= 1;

+						break;

+					case 'G':

+					case 'g':

+						l |= 2;

+						break;

+					case 'T':

+					case 't':

+						l |= 3;

+						break;

+					}

+					count += 2;

+					if (count % 8 == 0) {

+						bcount += 1;

+						bytes[bcount] = l;

+						count = 0;

+					}

+				}

+				bytes[bcount + 1] = l;

+				return bytes;

+			}

 

-                        r = 0;

-                        r |= pre;

-                        r <<= 4;

-                        r |= next;

+			private byte GetBitmap(byte t) {

+				byte r = 0;

+				switch (t) {

+				case 'A':

+				case 'a':

+					r = 1;

+					break;

+				case 'C':

+				case 'c':

+					r = 2;

+					break;

+				case 'G':

+				case 'g':

+					r = 4;

+					break;

+				case 'T':

+				case 't':

+					r = 8;

+					break;

+				}

+				return r;

+			}

 

-                        /*System.out.print(l);

-                        System.out.print(' ');

-                        System.out.print(r);

-                        System.out.println();*/

+			private byte ConvertSymbol(byte t) {

+				byte r = 0;

+				switch (t) {

+				case 'A':

+				case 'a':

+					r = 0;

+					break;

+				case 'C':

+				case 'c':

+					r = 1;

+					break;

+				case 'G':

+				case 'g':

+					r = 2;

+					break;

+				case 'T':

+				case 't':

+					r = 3;

+					break;

+				}

+				return r;

+			}

 

-                        tupleBuilder.reset();

+			void MoveKmer(byte[] bytes, byte c) {

+				byte filter0 = (byte) 0xC0;

+				byte filter1 = (byte) 0xFC;

+				byte filter2 = 0;

 

-                        //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

-                        tupleBuilder.addField(bytes, 0, byteNum + 1);

-                        tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

-                        

-                        

-                        //int[] a = tupleBuilder.getFieldEndOffsets();

-                        //int b = tupleBuilder.getSize();

-                        //byte[] c = tupleBuilder.getByteArray();

+				int r = byteNum * 8 - 2 * k;

+				r = 8 - r;

+				for (int i = 0; i < r; i++) {

+					filter2 <<= 1;

+					filter2 |= 1;

+				}

 

-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

-                                tupleBuilder.getSize())) {

-                            FrameUtils.flushFrame(outputBuffer, writer);

-                            outputAppender.reset(outputBuffer, true);

-                            if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),

-                                    0, tupleBuilder.getSize())) {

-                                throw new IllegalStateException(

-                                        "Failed to copy an record into a frame: the record size is too large.");

-                            }

-                        }

-                    }

-                } catch (Exception e) {

-                    throw new IllegalStateException(e);

-                }

-            }

-        };

+				int i = byteNum;

+				bytes[i] <<= 2;

+				bytes[i] &= filter2;

+				i -= 1;

+				while (i > 0) {

+					byte f = (byte) (bytes[i] & filter0);

+					f >>= 6;

+					bytes[i + 1] |= f;

+					bytes[i] <<= 2;

+					bytes[i] &= filter1;

+				}

+				bytes[i + 1] |= ConvertSymbol(c);

+			}

 

-    }

+			private void SplitReads(byte[] array) {

+				try {

+					byte[] bytes = null;

+

+					byte pre = 0, next = 0;

+					byte r;

+

+					for (int i = 0; i < array.length - k + 1; i++) {

+						if (0 == i) {

+							bytes = CompressKmer(array, i);

+						} else {

+							MoveKmer(bytes, array[i + k - 1]);

+							/*

+							 * l <<= 2; l &= window; l |= ConvertSymbol(array[i

+							 * + k - 1]);

+							 */

+							pre = GetBitmap(array[i - 1]);

+						}

+						if (i + k != array.length) {

+							next = GetBitmap(array[i + k]);

+						}

+

+						r = 0;

+						r |= pre;

+						r <<= 4;

+						r |= next;

+

+						/*

+						 * System.out.print(l); System.out.print(' ');

+						 * System.out.print(r); System.out.println();

+						 */

+

+						tupleBuilder.reset();

+

+						// tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

+						// l);

+						tupleBuilder.addField(bytes, 0, byteNum + 1);

+						tupleBuilder.addField(

+								ByteSerializerDeserializer.INSTANCE, r);

+

+						// int[] a = tupleBuilder.getFieldEndOffsets();

+						// int b = tupleBuilder.getSize();

+						// byte[] c = tupleBuilder.getByteArray();

+

+						if (!outputAppender.append(

+								tupleBuilder.getFieldEndOffsets(),

+								tupleBuilder.getByteArray(), 0,

+								tupleBuilder.getSize())) {

+							FrameUtils.flushFrame(outputBuffer, writer);

+							outputAppender.reset(outputBuffer, true);

+							if (!outputAppender.append(

+									tupleBuilder.getFieldEndOffsets(),

+									tupleBuilder.getByteArray(), 0,

+									tupleBuilder.getSize())) {

+								throw new IllegalStateException(

+										"Failed to copy an record into a frame: the record size is too large.");

+							}

+						}

+					}

+				} catch (Exception e) {

+					throw new IllegalStateException(e);

+				}

+			}

+		};

+

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
index abcb54e..dee3a72 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
@@ -35,125 +35,145 @@
 

 public class GenKmerDescriptor extends AbstractOperatorDescriptor {

 

-    private static final long serialVersionUID = 1L;

-    private static final int SPLIT_ACTIVITY_ID = 0;

-    private static final int MERGE_ACTIVITY_ID = 1;

-    private final int framesLimit;

-    private final int k;

+	private static final long serialVersionUID = 1L;

+	private static final int SPLIT_ACTIVITY_ID = 0;

+	private static final int MERGE_ACTIVITY_ID = 1;

+	private final int framesLimit;

+	private final int k;

 

-    public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int k) {

-        super(spec, 1, 1);

-        this.framesLimit = framesLimit;

-        this.k = k;

+	public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit,

+			int k) {

+		super(spec, 1, 1);

+		this.framesLimit = framesLimit;

+		this.k = k;

 

-        // TODO Auto-generated constructor stub

-        recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {

-                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                IntegerSerializerDeserializer.INSTANCE });

-    }

+		// TODO Auto-generated constructor stub

+		recordDescriptors[0] = new RecordDescriptor(

+				new ISerializerDeserializer[] {

+						Integer64SerializerDeserializer.INSTANCE,

+						ByteSerializerDeserializer.INSTANCE,

+						IntegerSerializerDeserializer.INSTANCE });

+	}

 

-    @Override

-    public void contributeActivities(IActivityGraphBuilder builder) {

-        // TODO Auto-generated method stub

-        SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));

-        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));

-        builder.addActivity(this, sa);

-        builder.addSourceEdge(0, sa, 0);

+	@Override

+	public void contributeActivities(IActivityGraphBuilder builder) {

+		// TODO Auto-generated method stub

+		SplitActivity sa = new SplitActivity(new ActivityId(odId,

+				SPLIT_ACTIVITY_ID));

+		MergeActivity ma = new MergeActivity(new ActivityId(odId,

+				MERGE_ACTIVITY_ID));

+		builder.addActivity(this, sa);

+		builder.addSourceEdge(0, sa, 0);

 

-        builder.addActivity(this, ma);

-        builder.addTargetEdge(0, ma, 0);

+		builder.addActivity(this, ma);

+		builder.addTargetEdge(0, ma, 0);

 

-        builder.addBlockingEdge(sa, ma);

-    }

+		builder.addBlockingEdge(sa, ma);

+	}

 

-    private class SplitActivity extends AbstractActivityNode {

-        /**

+	private class SplitActivity extends AbstractActivityNode {

+		/**

 		 * 

 		 */

-        private static final long serialVersionUID = 1L;

+		private static final long serialVersionUID = 1L;

 

-        public SplitActivity(ActivityId activityID) {

-            super(activityID);

-            // TODO Auto-generated constructor stub

-        }

+		public SplitActivity(ActivityId activityID) {

+			super(activityID);

+			// TODO Auto-generated constructor stub

+		}

 

-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,

-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)

-                throws HyracksDataException {

-            // TODO Auto-generated method stub

-            //IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size

-            KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(ctx, k, new RecordDescriptor(

-                    new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }), framesLimit,

-                    new TaskId(this.id, partition));

-            return op;

-        }

-    }

+		public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,

+				IRecordDescriptorProvider recordDescProvider, int partition,

+				int nPartitions) throws HyracksDataException {

+			// TODO Auto-generated method stub

+			// IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int

+			// buffer_size

+			KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(

+					ctx,

+					k,

+					new RecordDescriptor(

+							new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }),

+					framesLimit, new TaskId(this.id, partition));

+			return op;

+		}

+	}

 

-    public static class SplitTaskState extends AbstractStateObject {

-        List<IFrameReader> runs;

+	public static class SplitTaskState extends AbstractStateObject {

+		List<IFrameReader> runs;

 

-        public SplitTaskState() {

-        }

+		public SplitTaskState() {

+		}

 

-        public SplitTaskState(JobId jobId, TaskId taskId, List<IFrameReader> runs) {

-            super(jobId, taskId);

-            this.runs = runs;

-        }

+		public SplitTaskState(JobId jobId, TaskId taskId,

+				List<IFrameReader> runs) {

+			super(jobId, taskId);

+			this.runs = runs;

+		}

 

-        @Override

-        public void toBytes(DataOutput out) throws IOException {

+		@Override

+		public void toBytes(DataOutput out) throws IOException {

 

-        }

+		}

 

-        @Override

-        public void fromBytes(DataInput in) throws IOException {

+		@Override

+		public void fromBytes(DataInput in) throws IOException {

 

-        }

-    }

+		}

+	}

 

-    private class MergeActivity extends AbstractActivityNode {

+	private class MergeActivity extends AbstractActivityNode {

 

-        /**

+		/**

 		 * 

 		 */

-        private static final long serialVersionUID = 1L;

+		private static final long serialVersionUID = 1L;

 

-        public MergeActivity(ActivityId id) {

-            super(id);

-            // TODO Auto-generated constructor stub

-        }

+		public MergeActivity(ActivityId id) {

+			super(id);

+			// TODO Auto-generated constructor stub

+		}

 

-        @Override

-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)

-                throws HyracksDataException {

-            // TODO Auto-generated method stub

-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {

-                @Override

-                public void initialize() throws HyracksDataException {

-                    SplitTaskState state = (SplitTaskState) ctx.getStateObject(new TaskId(new ActivityId(

-                            getOperatorId(), SPLIT_ACTIVITY_ID), partition));

-                    //List<IFrameReader> runs = runs = new LinkedList<IFrameReader>();;

+		@Override

+		public IOperatorNodePushable createPushRuntime(

+				final IHyracksTaskContext ctx,

+				IRecordDescriptorProvider recordDescProvider,

+				final int partition, int nPartitions)

+				throws HyracksDataException {

+			// TODO Auto-generated method stub

+			IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {

+				@Override

+				public void initialize() throws HyracksDataException {

+					SplitTaskState state = (SplitTaskState) ctx

+							.getStateObject(new TaskId(new ActivityId(

+									getOperatorId(), SPLIT_ACTIVITY_ID),

+									partition));

+					// List<IFrameReader> runs = runs = new

+					// LinkedList<IFrameReader>();;

 

-                    IBinaryComparator[] comparators = new IBinaryComparator[1];

-                    IBinaryComparatorFactory cf = PointableBinaryComparatorFactory.of(LongPointable.FACTORY);

-                    comparators[0] = cf.createBinaryComparator();

+					IBinaryComparator[] comparators = new IBinaryComparator[1];

+					IBinaryComparatorFactory cf = PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY);

+					comparators[0] = cf.createBinaryComparator();

 

-                    //int necessaryFrames = Math.min(runs.size() + 2, framesLimit);

+					// int necessaryFrames = Math.min(runs.size() + 2,

+					// framesLimit);

 

-                    FrameSorter frameSorter = new FrameSorter(

-                            ctx,

-                            new int[] { 0 },

-                            new Integer64NormalizedKeyComputerFactory(),

-                            new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                            recordDescriptors[0]);

+					FrameSorter frameSorter = new FrameSorter(

+							ctx,

+							new int[] { 0 },

+							new Integer64NormalizedKeyComputerFactory(),

+							new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+									.of(LongPointable.FACTORY) },

+							recordDescriptors[0]);

 

-                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, state.runs,

-                            new int[] { 0 }, comparators, recordDescriptors[0], framesLimit, writer);

-                    merger.process();

-                }

-            };

-            return op;

-        }

-    }

+					ExternalSortRunMerger merger = new ExternalSortRunMerger(

+							ctx, frameSorter, state.runs, new int[] { 0 },

+							comparators, recordDescriptors[0], framesLimit,

+							writer);

+					merger.process();

+				}

+			};

+			return op;

+		}

+	}

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
index 31e6b85..aa7dba4 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
@@ -8,30 +8,31 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;

 

 public class KMerWriterFactory implements ITupleWriterFactory {

-    private static final long serialVersionUID = 1L;

+	private static final long serialVersionUID = 1L;

 

-    @Override

-    public ITupleWriter getTupleWriter() {

-        return new ITupleWriter() {

-            byte newLine = "\n".getBytes()[0];

+	@Override

+	public ITupleWriter getTupleWriter() {

+		return new ITupleWriter() {

+			byte newLine = "\n".getBytes()[0];

 

-            @Override

-            public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {

-            	try{

-            		for(int i = 0 ; i < 3 ; i++){

-            			byte[] data = tuple.getFieldData(0);

-            			int start = tuple.getFieldStart(0);

-            			int len = tuple.getFieldLength(0);

-            			output.write(data, start, len);

-            			output.writeChars(" ");

-            		}

-            		output.writeByte(newLine);

-            	} catch (Exception e) {

-                    throw new HyracksDataException(e);

-                }

-            }

+			@Override

+			public void write(DataOutput output, ITupleReference tuple)

+					throws HyracksDataException {

+				try {

+					for (int i = 0; i < 3; i++) {

+						byte[] data = tuple.getFieldData(0);

+						int start = tuple.getFieldStart(0);

+						int len = tuple.getFieldLength(0);

+						output.write(data, start, len);

+						output.writeChars(" ");

+					}

+					output.writeByte(newLine);

+				} catch (Exception e) {

+					throw new HyracksDataException(e);

+				}

+			}

 

-        };

-    }

+		};

+	}

 

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
index 1f6731d..efe054b 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
@@ -22,233 +22,238 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

 

-public class KmerSplitOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {

+public class KmerSplitOperatorNodePushable extends

+		AbstractUnaryInputSinkOperatorNodePushable {

 

-    private final int k;

-    private long window;

+	private final int k;

+	private long window;

 

-    private final SplitFrame frameSorter;

+	private final SplitFrame frameSorter;

 

-    private FrameTupleAccessor accessor;

-    private ArrayTupleBuilder tupleBuilder;

-    private TaskId MytaskId;

-    private IHyracksTaskContext ctx;

+	private FrameTupleAccessor accessor;

+	private ArrayTupleBuilder tupleBuilder;

+	private TaskId MytaskId;

+	private IHyracksTaskContext ctx;

 

-    public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size,

-            TaskId taskid) {

+	public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k,

+			RecordDescriptor rd_in, int buffer_size, TaskId taskid) {

 

-        tupleBuilder = new ArrayTupleBuilder(3);

-        this.k = k;

+		tupleBuilder = new ArrayTupleBuilder(3);

+		this.k = k;

 

-        RecordDescriptor rd = new RecordDescriptor(new ISerializerDeserializer[] {

-                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                IntegerSerializerDeserializer.INSTANCE });

+		RecordDescriptor rd = new RecordDescriptor(

+				new ISerializerDeserializer[] {

+						Integer64SerializerDeserializer.INSTANCE,

+						ByteSerializerDeserializer.INSTANCE,

+						IntegerSerializerDeserializer.INSTANCE });

 

-        int[] sortFields = { 0 };

-        frameSorter = new SplitFrame(ctx, sortFields, new Integer64NormalizedKeyComputerFactory(),

-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) }, rd,

-                buffer_size);

+		int[] sortFields = { 0 };

+		frameSorter = new SplitFrame(

+				ctx,

+				sortFields,

+				new Integer64NormalizedKeyComputerFactory(),

+				new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+						.of(LongPointable.FACTORY) }, rd, buffer_size);

 

-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);

+		accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);

 

-        new FrameTupleAccessor(ctx.getFrameSize(), rd);

-        new FrameTupleAppender(ctx.getFrameSize());

+		new FrameTupleAccessor(ctx.getFrameSize(), rd);

+		new FrameTupleAppender(ctx.getFrameSize());

 

-        ByteBuffer.allocate(ctx.getFrameSize());

+		ByteBuffer.allocate(ctx.getFrameSize());

 

-        //initialize the window

-        window = 0;

-        for (int i = 0; i < k; i++) {

-            window <<= 2;

-            window |= 3;

-        }

+		// initialize the window

+		window = 0;

+		for (int i = 0; i < k; i++) {

+			window <<= 2;

+			window |= 3;

+		}

 

-        MytaskId = taskid;

-        this.ctx = ctx;

-    }

+		MytaskId = taskid;

+		this.ctx = ctx;

+	}

 

-    @Override

-    public void open() throws HyracksDataException {

-        // TODO Auto-generated method stub

-        //writer.open();

+	@Override

+	public void open() throws HyracksDataException {

+		// TODO Auto-generated method stub

+		// writer.open();

 

-    }

+	}

 

-    @Override

-    public void nextFrame(ByteBuffer buffer) {

-        accessor.reset(buffer);

-        int tupleCount = accessor.getTupleCount();

-        ByteBuffer temp_buf = accessor.getBuffer();

-        for (int i = 0; i < tupleCount; i++) {

-            int tupleStartOffset = accessor.getTupleStartOffset(i);

-            int fieldStartOffset = accessor.getFieldStartOffset(i, 0);

-            int loadLength = accessor.getFieldLength(i, 0);

-            //int loadLength = temp_buf.getInt(tupleStartOffset);

-            byte[] read = new byte[loadLength];

-            int slotLength = accessor.getFieldSlotsLength();

-            //temp_buf.position(tupleStartOffset+fieldStartOffset + accessor.getFieldSlotsLength());

-            int pos = tupleStartOffset + fieldStartOffset + slotLength;

-            //temp_buf

-            try {

-                temp_buf.position(pos);

-                temp_buf.get(read, 0, loadLength);

-                SplitReads(read);

-            } catch (Exception e) {

-                // TODO Auto-generated catch block

-                e.printStackTrace();

-            }

+	@Override

+	public void nextFrame(ByteBuffer buffer) {

+		accessor.reset(buffer);

+		int tupleCount = accessor.getTupleCount();

+		ByteBuffer temp_buf = accessor.getBuffer();

+		for (int i = 0; i < tupleCount; i++) {

+			int tupleStartOffset = accessor.getTupleStartOffset(i);

+			int fieldStartOffset = accessor.getFieldStartOffset(i, 0);

+			int loadLength = accessor.getFieldLength(i, 0);

+			// int loadLength = temp_buf.getInt(tupleStartOffset);

+			byte[] read = new byte[loadLength];

+			int slotLength = accessor.getFieldSlotsLength();

+			// temp_buf.position(tupleStartOffset+fieldStartOffset +

+			// accessor.getFieldSlotsLength());

+			int pos = tupleStartOffset + fieldStartOffset + slotLength;

+			// temp_buf

+			try {

+				temp_buf.position(pos);

+				temp_buf.get(read, 0, loadLength);

+				SplitReads(read);

+			} catch (Exception e) {

+				// TODO Auto-generated catch block

+				e.printStackTrace();

+			}

 

-            tupleBuilder.reset();

+			tupleBuilder.reset();

 

-        }

-    }

+		}

+	}

 

-    private long CompressKmer(byte[] array, int start, int k) {

-        // a: 00; c: 01; G: 10; T: 11

-        long l = 0;

-        for (int i = start; i < start + k; i++) {

-            l <<= 2;

-            switch (array[start + i]) {

-                case 'A':

-                case 'a':

-                    l |= 0;

-                    break;

-                case 'C':

-                case 'c':

-                    l |= 1;

-                    break;

-                case 'G':

-                case 'g':

-                    l |= 2;

-                    break;

-                case 'T':

-                case 't':

-                    l |= 3;

-                    break;

-            }

-        }

-        return l;

-    }

+	private long CompressKmer(byte[] array, int start, int k) {

+		// a: 00; c: 01; G: 10; T: 11

+		long l = 0;

+		for (int i = start; i < start + k; i++) {

+			l <<= 2;

+			switch (array[start + i]) {

+			case 'A':

+			case 'a':

+				l |= 0;

+				break;

+			case 'C':

+			case 'c':

+				l |= 1;

+				break;

+			case 'G':

+			case 'g':

+				l |= 2;

+				break;

+			case 'T':

+			case 't':

+				l |= 3;

+				break;

+			}

+		}

+		return l;

+	}

 

-    private byte GetBitmap(byte t) {

-        byte r = 0;

-        switch (t) {

-            case 'A':

-            case 'a':

-                r = 1;

-                break;

-            case 'C':

-            case 'c':

-                r = 2;

-                break;

-            case 'G':

-            case 'g':

-                r = 4;

-                break;

-            case 'T':

-            case 't':

-                r = 8;

-                break;

-        }

-        return r;

-    }

+	private byte GetBitmap(byte t) {

+		byte r = 0;

+		switch (t) {

+		case 'A':

+		case 'a':

+			r = 1;

+			break;

+		case 'C':

+		case 'c':

+			r = 2;

+			break;

+		case 'G':

+		case 'g':

+			r = 4;

+			break;

+		case 'T':

+		case 't':

+			r = 8;

+			break;

+		}

+		return r;

+	}

 

-    private byte ConvertSymbol(byte t) {

-        byte r = 0;

-        switch (t) {

-            case 'A':

-            case 'a':

-                r = 0;

-                break;

-            case 'C':

-            case 'c':

-                r = 1;

-                break;

-            case 'G':

-            case 'g':

-                r = 2;

-                break;

-            case 'T':

-            case 't':

-                r = 3;

-                break;

-        }

-        return r;

-    }

+	private byte ConvertSymbol(byte t) {

+		byte r = 0;

+		switch (t) {

+		case 'A':

+		case 'a':

+			r = 0;

+			break;

+		case 'C':

+		case 'c':

+			r = 1;

+			break;

+		case 'G':

+		case 'g':

+			r = 2;

+			break;

+		case 'T':

+		case 't':

+			r = 3;

+			break;

+		}

+		return r;

+	}

 

-    private void SplitReads(byte[] array) {

-        try {

-            long l = 0;

+	private void SplitReads(byte[] array) {

+		try {

+			long l = 0;

 

-            byte pre = 0, next = 0;

-            byte r;

+			byte pre = 0, next = 0;

+			byte r;

 

-            for (int i = 2; i < array.length - k + 1; i++) {

-                if (2 == i) {

-                    l = CompressKmer(array, i, k);

-                } else {

-                    l <<= 2;

-                    l &= window;

-                    l |= ConvertSymbol(array[i + k - 1]);

-                    pre = GetBitmap(array[i - 1]);

-                }

-                if (i + k != array.length) {

-                    next = GetBitmap(array[i + k]);

-                }

+			for (int i = 2; i < array.length - k + 1; i++) {

+				if (2 == i) {

+					l = CompressKmer(array, i, k);

+				} else {

+					l <<= 2;

+					l &= window;

+					l |= ConvertSymbol(array[i + k - 1]);

+					pre = GetBitmap(array[i - 1]);

+				}

+				if (i + k != array.length) {

+					next = GetBitmap(array[i + k]);

+				}

 

-                r = 0;

-                r |= pre;

-                r <<= 4;

-                r |= next;

+				r = 0;

+				r |= pre;

+				r <<= 4;

+				r |= next;

 

-                /*System.out.print(l);

-                System.out.print(' ');

-                System.out.print(r);

-                System.out.println();*/

+				/*

+				 * System.out.print(l); System.out.print(' ');

+				 * System.out.print(r); System.out.println();

+				 */

 

-                frameSorter.insertKmer(l, r);

-            }

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

+				frameSorter.insertKmer(l, r);

+			}

+		} catch (Exception e) {

+			e.printStackTrace();

+		}

+	}

 

-    @Override

-    public void fail() throws HyracksDataException {

-        // TODO Auto-generated method stub

+	@Override

+	public void fail() throws HyracksDataException {

+		// TODO Auto-generated method stub

 

-    }

+	}

 

-    @Override

-    public void close() {

-        // TODO Auto-generated method stub

-        try {

-            frameSorter.processLastFrame();

-            SplitTaskState state = new SplitTaskState(ctx.getJobletContext().getJobId(), MytaskId,

-                    frameSorter.GetRuns());

-            ctx.setStateObject(state);

+	@Override

+	public void close() {

+		// TODO Auto-generated method stub

+		try {

+			frameSorter.processLastFrame();

+			SplitTaskState state = new SplitTaskState(ctx.getJobletContext()

+					.getJobId(), MytaskId, frameSorter.GetRuns());

+			ctx.setStateObject(state);

 

-            //writer.close();

-        } catch (Exception e) {

-            // TODO Auto-generated catch block

-            e.printStackTrace();

-        }

-    }

+			// writer.close();

+		} catch (Exception e) {

+			// TODO Auto-generated catch block

+			e.printStackTrace();

+		}

+	}

 

-    public List<IFrameReader> GetRuns() {

-        return frameSorter.GetRuns();

-    }

+	public List<IFrameReader> GetRuns() {

+		return frameSorter.GetRuns();

+	}

 

-    //for debug

-    /*	private void DumpBlock(ByteBuffer f){

-    		

-    		int n = f.array().length/13;

-    		

-    		for(int i = 0 ; i < n ; i++){

-    			long t = LongPointable.getLong(f.array(), 13 * i);

-    			System.out.print(t);

-    			System.out.print(' ');

-    		}

-    		System.out.println();

-    	}*/

+	// for debug

+	/*

+	 * private void DumpBlock(ByteBuffer f){

+	 * 

+	 * int n = f.array().length/13;

+	 * 

+	 * for(int i = 0 ; i < n ; i++){ long t = LongPointable.getLong(f.array(),

+	 * 13 * i); System.out.print(t); System.out.print(' '); }

+	 * System.out.println(); }

+	 */

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index fb0fc18..9070a35 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -7,7 +7,6 @@
 import java.io.OutputStreamWriter;

 import java.nio.ByteBuffer;

 

-

 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;

 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;

@@ -20,149 +19,133 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;

 

-public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {

- 

-    private static final long serialVersionUID = 1L;

-    private String filename;

-    private boolean writeFile;

-    private BufferedWriter twriter;

-    private FileOutputStream stream;

+public class PrinterOperatorDescriptor extends

+		AbstractSingleActivityOperatorDescriptor {

 

+	private static final long serialVersionUID = 1L;

+	private String filename;

+	private boolean writeFile;

+	private BufferedWriter twriter;

+	private FileOutputStream stream;

 

-    /**

-     * The constructor of HDFSWriteOperatorDescriptor.

-     * 

-     * @param spec

-     *            the JobSpecification object

-     * @param conf

-     *            the Hadoop JobConf which contains the output path

-     * @param tupleWriterFactory

-     *            the ITupleWriterFactory implementation object

-     * @throws HyracksException

-     */

-    public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {

-        super(spec, 1, 0);

-        writeFile = false;

-    }

-    

-    public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec, String filename) {

-        super(spec, 1, 0);

-        this.filename = filename;

-        writeFile = true;

-    }

+	/**

+	 * The constructor of HDFSWriteOperatorDescriptor.

+	 * 

+	 * @param spec

+	 *            the JobSpecification object

+	 * @param conf

+	 *            the Hadoop JobConf which contains the output path

+	 * @param tupleWriterFactory

+	 *            the ITupleWriterFactory implementation object

+	 * @throws HyracksException

+	 */

+	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {

+		super(spec, 1, 0);

+		writeFile = false;

+	}

 

-    @Override

-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,

-            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)

-            throws HyracksDataException {

+	public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec,

+			String filename) {

+		super(spec, 1, 0);

+		this.filename = filename;

+		writeFile = true;

+	}

 

-        return new AbstractUnaryInputSinkOperatorNodePushable() {

-            private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;

-            private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);

-            private FrameTupleReference tuple = new FrameTupleReference();

+	@Override

+	public IOperatorNodePushable createPushRuntime(

+			final IHyracksTaskContext ctx,

+			final IRecordDescriptorProvider recordDescProvider,

+			final int partition, final int nPartitions)

+			throws HyracksDataException {

 

+		return new AbstractUnaryInputSinkOperatorNodePushable() {

+			private RecordDescriptor inputRd = recordDescProvider

+					.getInputRecordDescriptor(getActivityId(), 0);;

+			private FrameTupleAccessor accessor = new FrameTupleAccessor(

+					ctx.getFrameSize(), inputRd);

+			private FrameTupleReference tuple = new FrameTupleReference();

 

-            @Override

-            public void open() throws HyracksDataException {

-            	if( true == writeFile){

-            		try {

-    					filename = filename + String.valueOf(partition)+".txt";

-    					//System.err.println(filename);

-            			stream = new FileOutputStream(filename);

-    				} catch (FileNotFoundException e) {

-    					// TODO Auto-generated catch block

-    					e.printStackTrace();

-    				}

-                    twriter = new BufferedWriter(new OutputStreamWriter(stream));

-            	}

-            }

+			@Override

+			public void open() throws HyracksDataException {

+				if (true == writeFile) {

+					try {

+						filename = filename + String.valueOf(partition)

+								+ ".txt";

+						// System.err.println(filename);

+						stream = new FileOutputStream(filename);

+					} catch (FileNotFoundException e) {

+						// TODO Auto-generated catch block

+						e.printStackTrace();

+					}

+					twriter = new BufferedWriter(new OutputStreamWriter(stream));

+				}

+			}

 

-            

-            private void PrintBytes(int no){

-                try{

-	            	

-	                byte[] bytes = tuple.getFieldData(no);

-	                int offset = tuple.getFieldStart(no);

-	                int length = tuple.getFieldLength(no);

-	                if(true == writeFile){

-	                	for(int j = offset ; j < offset + length ; j++){

-	                	   	twriter.write(String.valueOf((int)bytes[j]));

-	                	   	twriter.write(" ");

-	                	}

-	                	twriter.write("&&");

-	                }

-	                else{

-	                	for(int j = offset ; j < offset + length ;j++){

-	                	   	System.err.print(String.valueOf((int)bytes[j]));

-	                	   	System.err.print(" ");

-	                	}

-	                	System.err.print("&&");

-	                }

-                }

-	            catch(IOException e){

-	            	e.printStackTrace();

-	            }

-            }

-            

-            @Override

-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

-                try{

-	            	accessor.reset(buffer);

-	                int tupleCount = accessor.getTupleCount();

-	                for (int i = 0; i < tupleCount; i++) {

-	                	tuple.reset(accessor, i);

-	                	int tj = tuple.getFieldCount();

-	                    for(int j = 0 ; j< tj ; j++){

-	                    	PrintBytes(j);

-	                    }

-	                    if(true == writeFile){

-	                    	twriter.write("\n");

-	                    }

-	                    else{

-	                    	System.err.println();

-	                    }

-	                }

-                }

-	            catch(IOException e){

-	            	e.printStackTrace();

-	            }

-            }

+			private void PrintBytes(int no) {

+				try {

 

-            @Override

-            public void fail() throws HyracksDataException {

+					byte[] bytes = tuple.getFieldData(no);

+					int offset = tuple.getFieldStart(no);

+					int length = tuple.getFieldLength(no);

+					if (true == writeFile) {

+						for (int j = offset; j < offset + length; j++) {

+							twriter.write(String.valueOf((int) bytes[j]));

+							twriter.write(" ");

+						}

+						twriter.write("&&");

+					} else {

+						for (int j = offset; j < offset + length; j++) {

+							System.err.print(String.valueOf((int) bytes[j]));

+							System.err.print(" ");

+						}

+						System.err.print("&&");

+					}

+				} catch (IOException e) {

+					e.printStackTrace();

+				}

+			}

 

-            }

+			@Override

+			public void nextFrame(ByteBuffer buffer)

+					throws HyracksDataException {

+				try {

+					accessor.reset(buffer);

+					int tupleCount = accessor.getTupleCount();

+					for (int i = 0; i < tupleCount; i++) {

+						tuple.reset(accessor, i);

+						int tj = tuple.getFieldCount();

+						for (int j = 0; j < tj; j++) {

+							PrintBytes(j);

+						}

+						if (true == writeFile) {

+							twriter.write("\n");

+						} else {

+							System.err.println();

+						}

+					}

+				} catch (IOException e) {

+					e.printStackTrace();

+				}

+			}

 

-            @Override

-            public void close() throws HyracksDataException {

-               	if( true == writeFile){

-            		try {

-            			twriter.close();

-    					stream.close();

-    				} catch (IOException e) {

-    					// TODO Auto-generated catch block

-    					e.printStackTrace();

-    				}

-            	}

-            }

+			@Override

+			public void fail() throws HyracksDataException {

 

-        };

-    }

+			}

+

+			@Override

+			public void close() throws HyracksDataException {

+				if (true == writeFile) {

+					try {

+						twriter.close();

+						stream.close();

+					} catch (IOException e) {

+						// TODO Auto-generated catch block

+						e.printStackTrace();

+					}

+				}

+			}

+

+		};

+	}

 }

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	

-	
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 2d7ee70..60eaa35 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -17,216 +17,225 @@
 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;

 import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;

 

-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {

-    private static final long serialVersionUID = 1L;

+public class ReadsKeyValueParserFactory implements

+		IKeyValueParserFactory<LongWritable, Text> {

+	private static final long serialVersionUID = 1L;

 

-    private int k;

-    private int byteNum;

-    

-    public ReadsKeyValueParserFactory(int k){

-    	this.k = k;

-        byteNum = (byte)Math.ceil((double)k/4.0);

-    }

-    @Override

-    public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {

-;

-        

-        final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);

-        final ByteBuffer  outputBuffer = ctx.allocateFrame();

-        final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());

-        outputAppender.reset(outputBuffer, true);

-    	

+	private int k;

+	private int byteNum;

 

-        return new IKeyValueParser<LongWritable, Text>() {

+	public ReadsKeyValueParserFactory(int k) {

+		this.k = k;

+		byteNum = (byte) Math.ceil((double) k / 4.0);

+	}

 

-            @Override

-            public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {

-                String geneLine = value.toString(); // Read the Real Gene Line

-                Pattern genePattern = Pattern.compile("[AGCT]+");

-                Matcher geneMatcher = genePattern.matcher(geneLine);

-                boolean isValid = geneMatcher.matches();

-                if(isValid){

-                	SplitReads(geneLine.getBytes(), writer);

-                }

-            }

+	@Override

+	public IKeyValueParser<LongWritable, Text> createKeyValueParser(

+			final IHyracksTaskContext ctx) {

+		;

 

-            @Override

-            public void flush(IFrameWriter writer) throws HyracksDataException {

-                FrameUtils.flushFrame(outputBuffer, writer);

-            }

-            

-            

-            private byte[] CompressKmer(byte[] array, int start) {

-                // a: 00; c: 01; G: 10; T: 11

-            	

-            	byte[] bytes = new byte[byteNum+1];

-            	bytes[0] = (byte) k;

-            	

-            	byte l = 0;

-            	int count = 0;

-            	int bcount = 0;

-            	

-                for (int i = start; i < start + k; i++) {

-                    l <<= 2;

-                    switch (array[i]) {

-                        case 'A':

-                        case 'a':

-                            l |= 0;

-                            break;

-                        case 'C':

-                        case 'c':

-                            l |= 1;

-                            break;

-                        case 'G':

-                        case 'g':

-                            l |= 2;

-                            break;

-                        case 'T':

-                        case 't':

-                            l |= 3;

-                            break;

-                    }

-                    count += 2;

-                    if(count%8==0){

-                    	bcount += 1;

-                    	bytes[bcount] = l;

-                    	count = 0;

-                    }

-                }

-                bytes[bcount + 1] = l;

-                return bytes;

-            }

+		final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);

+		final ByteBuffer outputBuffer = ctx.allocateFrame();

+		final FrameTupleAppender outputAppender = new FrameTupleAppender(

+				ctx.getFrameSize());

+		outputAppender.reset(outputBuffer, true);

 

-            private byte GetBitmap(byte t) {

-                byte r = 0;

-                switch (t) {

-                    case 'A':

-                    case 'a':

-                        r = 1;

-                        break;

-                    case 'C':

-                    case 'c':

-                        r = 2;

-                        break;

-                    case 'G':

-                    case 'g':

-                        r = 4;

-                        break;

-                    case 'T':

-                    case 't':

-                        r = 8;

-                        break;

-                }

-                return r;

-            }

+		return new IKeyValueParser<LongWritable, Text>() {

 

-            private byte ConvertSymbol(byte t) {

-                byte r = 0;

-                switch (t) {

-                    case 'A':

-                    case 'a':

-                        r = 0;

-                        break;

-                    case 'C':

-                    case 'c':

-                        r = 1;

-                        break;

-                    case 'G':

-                    case 'g':

-                        r = 2;

-                        break;

-                    case 'T':

-                    case 't':

-                        r = 3;

-                        break;

-                }

-                return r;

-            }

-            

-            void MoveKmer(byte[] bytes, byte c){            	

-            	byte filter0 = (byte) 0xC0;

-            	byte filter1 = (byte) 0xFC;

-            	byte filter2 = 0;

-            	

-            	int r = byteNum*8 - 2*k;

-            	r = 8 - r;

-            	for(int i = 0 ; i < r ; i++){

-            		filter2 <<= 1;

-            		filter2 |= 1;

-            	}

+			@Override

+			public void parse(LongWritable key, Text value, IFrameWriter writer)

+					throws HyracksDataException {

+				String geneLine = value.toString(); // Read the Real Gene Line

+				Pattern genePattern = Pattern.compile("[AGCT]+");

+				Matcher geneMatcher = genePattern.matcher(geneLine);

+				boolean isValid = geneMatcher.matches();

+				if (isValid) {

+					SplitReads(geneLine.getBytes(), writer);

+				}

+			}

 

-                int i = byteNum;

-                bytes[i] <<= 2;

-                bytes[i] &= filter2;

-                i -= 1;

-            	while(i > 0){

-            		byte f = (byte) (bytes[i] & filter0);

-            		f >>= 6;

-                	bytes[i+1] |= f;

-            		bytes[i] <<= 2;

-            		bytes[i] &= filter1;

-            	}

-            	bytes[i+1] |= ConvertSymbol(c);

-            }

+			@Override

+			public void flush(IFrameWriter writer) throws HyracksDataException {

+				FrameUtils.flushFrame(outputBuffer, writer);

+			}

 

-            private void SplitReads(byte[] array, IFrameWriter writer) {

-                try {

-                	byte[] bytes=null;

-                	

-                	byte pre = 0, next = 0;

-                    byte r;

+			private byte[] CompressKmer(byte[] array, int start) {

+				// a: 00; c: 01; G: 10; T: 11

 

-                    for (int i = 0; i < array.length - k + 1; i++) {

-                        if (0 == i) {

-                            bytes = CompressKmer(array, i);

-                        } else {

-                        	MoveKmer(bytes, array[i + k - 1]);

-                            /*l <<= 2;

-                            l &= window;

-                            l |= ConvertSymbol(array[i + k - 1]);*/

-                            pre = GetBitmap(array[i - 1]);

-                        }

-                        if (i + k != array.length) {

-                            next = GetBitmap(array[i + k]);

-                        }

+				byte[] bytes = new byte[byteNum + 1];

+				bytes[0] = (byte) k;

 

-                        r = 0;

-                        r |= pre;

-                        r <<= 4;

-                        r |= next;

+				byte l = 0;

+				int count = 0;

+				int bcount = 0;

 

-                        /*System.out.print(l);

-                        System.out.print(' ');

-                        System.out.print(r);

-                        System.out.println();*/

+				for (int i = start; i < start + k; i++) {

+					l <<= 2;

+					switch (array[i]) {

+					case 'A':

+					case 'a':

+						l |= 0;

+						break;

+					case 'C':

+					case 'c':

+						l |= 1;

+						break;

+					case 'G':

+					case 'g':

+						l |= 2;

+						break;

+					case 'T':

+					case 't':

+						l |= 3;

+						break;

+					}

+					count += 2;

+					if (count % 8 == 0) {

+						bcount += 1;

+						bytes[bcount] = l;

+						count = 0;

+					}

+				}

+				bytes[bcount + 1] = l;

+				return bytes;

+			}

 

-                        tupleBuilder.reset();

+			private byte GetBitmap(byte t) {

+				byte r = 0;

+				switch (t) {

+				case 'A':

+				case 'a':

+					r = 1;

+					break;

+				case 'C':

+				case 'c':

+					r = 2;

+					break;

+				case 'G':

+				case 'g':

+					r = 4;

+					break;

+				case 'T':

+				case 't':

+					r = 8;

+					break;

+				}

+				return r;

+			}

 

-                        //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

-                        tupleBuilder.addField(bytes, 0, byteNum + 1);

-                        tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

-                        

-                        

-                        //int[] a = tupleBuilder.getFieldEndOffsets();

-                        //int b = tupleBuilder.getSize();

-                        //byte[] c = tupleBuilder.getByteArray();

+			private byte ConvertSymbol(byte t) {

+				byte r = 0;

+				switch (t) {

+				case 'A':

+				case 'a':

+					r = 0;

+					break;

+				case 'C':

+				case 'c':

+					r = 1;

+					break;

+				case 'G':

+				case 'g':

+					r = 2;

+					break;

+				case 'T':

+				case 't':

+					r = 3;

+					break;

+				}

+				return r;

+			}

 

-                        if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

-                                tupleBuilder.getSize())) {

-                            FrameUtils.flushFrame(outputBuffer, writer);

-                            outputAppender.reset(outputBuffer, true);

-                            if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),

-                                    0, tupleBuilder.getSize())) {

-                                throw new IllegalStateException(

-                                        "Failed to copy an record into a frame: the record size is too large.");

-                            }

-                        }

-                    }

-                } catch (Exception e) {

-                    throw new IllegalStateException(e);

-                }

-            }

-        };

-    }

+			void MoveKmer(byte[] bytes, byte c) {

+				byte filter0 = (byte) 0xC0;

+				byte filter1 = (byte) 0xFC;

+				byte filter2 = 0;

+

+				int r = byteNum * 8 - 2 * k;

+				r = 8 - r;

+				for (int i = 0; i < r; i++) {

+					filter2 <<= 1;

+					filter2 |= 1;

+				}

+

+				int i = byteNum;

+				bytes[i] <<= 2;

+				bytes[i] &= filter2;

+				i -= 1;

+				while (i > 0) {

+					byte f = (byte) (bytes[i] & filter0);

+					f >>= 6;

+					bytes[i + 1] |= f;

+					bytes[i] <<= 2;

+					bytes[i] &= filter1;

+				}

+				bytes[i + 1] |= ConvertSymbol(c);

+			}

+

+			private void SplitReads(byte[] array, IFrameWriter writer) {

+				try {

+					byte[] bytes = null;

+

+					byte pre = 0, next = 0;

+					byte r;

+

+					for (int i = 0; i < array.length - k + 1; i++) {

+						if (0 == i) {

+							bytes = CompressKmer(array, i);

+						} else {

+							MoveKmer(bytes, array[i + k - 1]);

+							/*

+							 * l <<= 2; l &= window; l |= ConvertSymbol(array[i

+							 * + k - 1]);

+							 */

+							pre = GetBitmap(array[i - 1]);

+						}

+						if (i + k != array.length) {

+							next = GetBitmap(array[i + k]);

+						}

+

+						r = 0;

+						r |= pre;

+						r <<= 4;

+						r |= next;

+

+						/*

+						 * System.out.print(l); System.out.print(' ');

+						 * System.out.print(r); System.out.println();

+						 */

+

+						tupleBuilder.reset();

+

+						// tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

+						// l);

+						tupleBuilder.addField(bytes, 0, byteNum + 1);

+						tupleBuilder.addField(

+								ByteSerializerDeserializer.INSTANCE, r);

+

+						// int[] a = tupleBuilder.getFieldEndOffsets();

+						// int b = tupleBuilder.getSize();

+						// byte[] c = tupleBuilder.getByteArray();

+

+						if (!outputAppender.append(

+								tupleBuilder.getFieldEndOffsets(),

+								tupleBuilder.getByteArray(), 0,

+								tupleBuilder.getSize())) {

+							FrameUtils.flushFrame(outputBuffer, writer);

+							outputAppender.reset(outputBuffer, true);

+							if (!outputAppender.append(

+									tupleBuilder.getFieldEndOffsets(),

+									tupleBuilder.getByteArray(), 0,

+									tupleBuilder.getSize())) {

+								throw new IllegalStateException(

+										"Failed to copy an record into a frame: the record size is too large.");

+							}

+						}

+					}

+				} catch (Exception e) {

+					throw new IllegalStateException(e);

+				}

+			}

+		};

+	}

 

 }
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
index 179c983..0c8bd07 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
@@ -42,443 +42,456 @@
 

 public class SplitFrame {

 

-    private static int HASH_SIZE = 4096;

-    private final SerializableHashTable table;

-    private final TuplePointer tempPointer;

-    private ArrayTupleBuilder tupleBuilder;

-    private final int buf_size;

+	private static int HASH_SIZE = 4096;

+	private final SerializableHashTable table;

+	private final TuplePointer tempPointer;

+	private ArrayTupleBuilder tupleBuilder;

+	private final int buf_size;

 

-    private final IHyracksTaskContext ctx;

-    private final int[] sortFields;

-    private final INormalizedKeyComputer nkc;

-    private final IBinaryComparator[] comparators;

-    private final ByteBuffer[] buffers;

+	private final IHyracksTaskContext ctx;

+	private final int[] sortFields;

+	private final INormalizedKeyComputer nkc;

+	private final IBinaryComparator[] comparators;

+	private final ByteBuffer[] buffers;

 

-    private final FrameTupleAccessor fta1;

-    private final FrameTupleAccessor fta2;

+	private final FrameTupleAccessor fta1;

+	private final FrameTupleAccessor fta2;

 

-    private final FrameTupleAppender appender;

+	private final FrameTupleAppender appender;

 

-    private final ByteBuffer outFrame;

+	private final ByteBuffer outFrame;

 

-    private int dataFrameCount;

-    private int[] tPointers;

-    private int tupleCount;

-    private final List<IFrameReader> runs;

-    private int flushCount;

-    private RecordDescriptor recordDescriptor;

+	private int dataFrameCount;

+	private int[] tPointers;

+	private int tupleCount;

+	private final List<IFrameReader> runs;

+	private int flushCount;

+	private RecordDescriptor recordDescriptor;

 

-    private int FrameTupleCount;

+	private int FrameTupleCount;

 

-    public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,

-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,

-            RecordDescriptor recordDescriptor, int buf_size) {

-        this.ctx = ctx;

-        this.sortFields = sortFields;

-        this.recordDescriptor = recordDescriptor;

+	public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,

+			INormalizedKeyComputerFactory firstKeyNormalizerFactory,

+			IBinaryComparatorFactory[] comparatorFactories,

+			RecordDescriptor recordDescriptor, int buf_size) {

+		this.ctx = ctx;

+		this.sortFields = sortFields;

+		this.recordDescriptor = recordDescriptor;

 

-        nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();

-        comparators = new IBinaryComparator[comparatorFactories.length];

-        for (int i = 0; i < comparatorFactories.length; ++i) {

-            comparators[i] = comparatorFactories[i].createBinaryComparator();

-        }

-        fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

-        fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

-        appender = new FrameTupleAppender(ctx.getFrameSize());

-        outFrame = ctx.allocateFrame();

-        table = new SerializableHashTable(HASH_SIZE, ctx);

-        dataFrameCount = 0;

+		nkc = firstKeyNormalizerFactory == null ? null

+				: firstKeyNormalizerFactory.createNormalizedKeyComputer();

+		comparators = new IBinaryComparator[comparatorFactories.length];

+		for (int i = 0; i < comparatorFactories.length; ++i) {

+			comparators[i] = comparatorFactories[i].createBinaryComparator();

+		}

+		fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+		fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+		appender = new FrameTupleAppender(ctx.getFrameSize());

+		outFrame = ctx.allocateFrame();

+		table = new SerializableHashTable(HASH_SIZE, ctx);

+		dataFrameCount = 0;

 

-        tempPointer = new TuplePointer();

-        tupleBuilder = new ArrayTupleBuilder(3);

-        this.buf_size = buf_size;

-        buffers = new ByteBuffer[buf_size];

-        for (int i = 0; i < buf_size; i++) {

-            buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());

-        }

-        appender.reset(buffers[0], true);

-        flushCount = 0;

-        runs = new LinkedList<IFrameReader>();

-        FrameTupleCount = 0;

-    }

+		tempPointer = new TuplePointer();

+		tupleBuilder = new ArrayTupleBuilder(3);

+		this.buf_size = buf_size;

+		buffers = new ByteBuffer[buf_size];

+		for (int i = 0; i < buf_size; i++) {

+			buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());

+		}

+		appender.reset(buffers[0], true);

+		flushCount = 0;

+		runs = new LinkedList<IFrameReader>();

+		FrameTupleCount = 0;

+	}

 

-    public void reset() {

-        dataFrameCount = 0;

-        tupleCount = 0;

-        appender.reset(buffers[0], true);

-    }

+	public void reset() {

+		dataFrameCount = 0;

+		tupleCount = 0;

+		appender.reset(buffers[0], true);

+	}

 

-    public int getFrameCount() {

-        return dataFrameCount;

-    }

+	public int getFrameCount() {

+		return dataFrameCount;

+	}

 

-    private void SearchHashTable(long entry, TuplePointer dataPointer) {

-        int offset = 0;

-        int tp = (int) (entry % HASH_SIZE);

-        if (tp < 0) {

-            tp = -tp;

-        }

-        do {

-            table.getTuplePointer(tp, offset, dataPointer);// what is the offset mean?

-            if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)

-                break;

-            int bIndex = dataPointer.frameIndex;

-            int tIndex = dataPointer.tupleIndex;

-            fta1.reset(buffers[bIndex]);

+	private void SearchHashTable(long entry, TuplePointer dataPointer) {

+		int offset = 0;

+		int tp = (int) (entry % HASH_SIZE);

+		if (tp < 0) {

+			tp = -tp;

+		}

+		do {

+			table.getTuplePointer(tp, offset, dataPointer);// what is the offset

+															// mean?

+			if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)

+				break;

+			int bIndex = dataPointer.frameIndex;

+			int tIndex = dataPointer.tupleIndex;

+			fta1.reset(buffers[bIndex]);

 

-            /* System.out.print("a:");

-             System.out.print(tIndex);

-             System.out.print(" b");

-             System.out.print(fta1.getTupleCount());

-             System.out.println();*/

+			/*

+			 * System.out.print("a:"); System.out.print(tIndex);

+			 * System.out.print(" b"); System.out.print(fta1.getTupleCount());

+			 * System.out.println();

+			 */

 

-            int tupleOffset = fta1.getTupleStartOffset(tIndex);

-            int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);

-            int slotLength = fta1.getFieldSlotsLength();

-            int pos = tupleOffset + fieldOffset + slotLength;

-            long l = buffers[bIndex].getLong(pos);

-            if (l == entry) {

-                break;

-            }

-            offset += 1;

-        } while (true);

-    }

+			int tupleOffset = fta1.getTupleStartOffset(tIndex);

+			int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);

+			int slotLength = fta1.getFieldSlotsLength();

+			int pos = tupleOffset + fieldOffset + slotLength;

+			long l = buffers[bIndex].getLong(pos);

+			if (l == entry) {

+				break;

+			}

+			offset += 1;

+		} while (true);

+	}

 

-    private void InsertHashTable(long entry, int frame_id, int tuple_id) {

+	private void InsertHashTable(long entry, int frame_id, int tuple_id) {

 

-        tempPointer.frameIndex = frame_id;

-        tempPointer.tupleIndex = tuple_id;

+		tempPointer.frameIndex = frame_id;

+		tempPointer.tupleIndex = tuple_id;

 

-        //System.out.print(frame_id);

-        //System.out.print(' ');

-        //System.out.println(tuple_id);

+		// System.out.print(frame_id);

+		// System.out.print(' ');

+		// System.out.println(tuple_id);

 

-        int tp = (int) (entry % HASH_SIZE);

-        if (tp < 0) {

-            tp = -tp;

-        }

-        table.insert(tp, tempPointer);

+		int tp = (int) (entry % HASH_SIZE);

+		if (tp < 0) {

+			tp = -tp;

+		}

+		table.insert(tp, tempPointer);

 

-    }

+	}

 

-    public void insertKmer(long l, byte r) {

-        try {

-            SearchHashTable(l, tempPointer);

-            if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {

-                fta1.reset(buffers[tempPointer.frameIndex]);

-                int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);

-                int f0StartRel = fta1.getFieldStartOffset(tempPointer.tupleIndex, 1);

-                int slotLength = fta1.getFieldSlotsLength();

-                int pos = f0StartRel + tStart + slotLength;

+	public void insertKmer(long l, byte r) {

+		try {

+			SearchHashTable(l, tempPointer);

+			if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {

+				fta1.reset(buffers[tempPointer.frameIndex]);

+				int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);

+				int f0StartRel = fta1.getFieldStartOffset(

+						tempPointer.tupleIndex, 1);

+				int slotLength = fta1.getFieldSlotsLength();

+				int pos = f0StartRel + tStart + slotLength;

 

-                buffers[tempPointer.frameIndex].array()[pos] |= r;

-                buffers[tempPointer.frameIndex].position(pos + 1);

-                int temp_int = buffers[tempPointer.frameIndex].getInt();

-                temp_int += 1;

-                buffers[tempPointer.frameIndex].position(pos + 1);

-                buffers[tempPointer.frameIndex].putInt(temp_int);

-            } else {

-                tupleBuilder.reset();

-                tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);

-                tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

-                tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);

+				buffers[tempPointer.frameIndex].array()[pos] |= r;

+				buffers[tempPointer.frameIndex].position(pos + 1);

+				int temp_int = buffers[tempPointer.frameIndex].getInt();

+				temp_int += 1;

+				buffers[tempPointer.frameIndex].position(pos + 1);

+				buffers[tempPointer.frameIndex].putInt(temp_int);

+			} else {

+				tupleBuilder.reset();

+				tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,

+						l);

+				tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);

+				tupleBuilder

+						.addField(IntegerSerializerDeserializer.INSTANCE, 1);

 

-                /*System.out.print(l);

-                System.out.print(' ');

-                System.out.print(r);

-                System.out.println();*/

-                boolean b = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

-                        tupleBuilder.getSize());

+				/*

+				 * System.out.print(l); System.out.print(' ');

+				 * System.out.print(r); System.out.println();

+				 */

+				boolean b = appender.append(tupleBuilder.getFieldEndOffsets(),

+						tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());

 

-                if (!b) {

-                    dataFrameCount++;

-                    FrameTupleCount = 0;

-                    if (dataFrameCount < buf_size) {

-                        appender.reset(buffers[dataFrameCount], true);

-                    } else {

-                        sortFrames();

-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(

-                                ExternalSortRunGenerator.class.getSimpleName());

-                        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

-                        writer.open();

-                        try {

-                            flushCount += 1;

-                            flushFrames(writer);

-                        } finally {

-                            writer.close();

-                        }

-                        runs.add(writer.createReader());

-                        dataFrameCount = 0;

-                        appender.reset(buffers[dataFrameCount], true);

-                    }

-                    boolean tb = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,

-                            tupleBuilder.getSize());

-                    if (!tb) {

-                        throw new HyracksDataException(

-                                "Failed to copy an record into a frame: the record size is too large");

-                    }

-                }

-                InsertHashTable(l, dataFrameCount, FrameTupleCount);

-                FrameTupleCount += 1;

-            }

-        } catch (HyracksDataException e) {

-            // TODO Auto-generated catch block

-            e.printStackTrace();

-        }

-    }

+				if (!b) {

+					dataFrameCount++;

+					FrameTupleCount = 0;

+					if (dataFrameCount < buf_size) {

+						appender.reset(buffers[dataFrameCount], true);

+					} else {

+						sortFrames();

+						FileReference file = ctx.getJobletContext()

+								.createManagedWorkspaceFile(

+										ExternalSortRunGenerator.class

+												.getSimpleName());

+						RunFileWriter writer = new RunFileWriter(file,

+								ctx.getIOManager());

+						writer.open();

+						try {

+							flushCount += 1;

+							flushFrames(writer);

+						} finally {

+							writer.close();

+						}

+						runs.add(writer.createReader());

+						dataFrameCount = 0;

+						appender.reset(buffers[dataFrameCount], true);

+					}

+					boolean tb = appender.append(

+							tupleBuilder.getFieldEndOffsets(),

+							tupleBuilder.getByteArray(), 0,

+							tupleBuilder.getSize());

+					if (!tb) {

+						throw new HyracksDataException(

+								"Failed to copy an record into a frame: the record size is too large");

+					}

+				}

+				InsertHashTable(l, dataFrameCount, FrameTupleCount);

+				FrameTupleCount += 1;

+			}

+		} catch (HyracksDataException e) {

+			// TODO Auto-generated catch block

+			e.printStackTrace();

+		}

+	}

 

-    public void sortFrames() {

-        int nBuffers = dataFrameCount;

-        tupleCount = 0;

-        for (int i = 0; i < nBuffers; ++i) {

-            fta1.reset(buffers[i]);

-            tupleCount += fta1.getTupleCount();

-        }

-        int sfIdx = sortFields[0];

-        tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;

-        int ptr = 0;

-        for (int i = 0; i < nBuffers; ++i) {

-            fta1.reset(buffers[i]);

-            int tCount = fta1.getTupleCount();

-            byte[] array = fta1.getBuffer().array();

-            for (int j = 0; j < tCount; ++j) {

-                int tStart = fta1.getTupleStartOffset(j);

-                int tEnd = fta1.getTupleEndOffset(j);

-                tPointers[ptr * 4] = i;

-                tPointers[ptr * 4 + 1] = tStart;

-                tPointers[ptr * 4 + 2] = tEnd;

-                int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);

-                int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);

-                int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();

-                tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);

-                ++ptr;

-            }

-        }

-        if (tupleCount > 0) {

-            sort(tPointers, 0, tupleCount);

-        }

+	public void sortFrames() {

+		int nBuffers = dataFrameCount;

+		tupleCount = 0;

+		for (int i = 0; i < nBuffers; ++i) {

+			fta1.reset(buffers[i]);

+			tupleCount += fta1.getTupleCount();

+		}

+		int sfIdx = sortFields[0];

+		tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4]

+				: tPointers;

+		int ptr = 0;

+		for (int i = 0; i < nBuffers; ++i) {

+			fta1.reset(buffers[i]);

+			int tCount = fta1.getTupleCount();

+			byte[] array = fta1.getBuffer().array();

+			for (int j = 0; j < tCount; ++j) {

+				int tStart = fta1.getTupleStartOffset(j);

+				int tEnd = fta1.getTupleEndOffset(j);

+				tPointers[ptr * 4] = i;

+				tPointers[ptr * 4 + 1] = tStart;

+				tPointers[ptr * 4 + 2] = tEnd;

+				int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);

+				int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);

+				int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();

+				tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array,

+						f0Start, f0EndRel - f0StartRel);

+				++ptr;

+			}

+		}

+		if (tupleCount > 0) {

+			sort(tPointers, 0, tupleCount);

+		}

 

-        DumpAllBuffers();

-        //point the pointer to the first one

-        dataFrameCount = 0;

-    }

+		DumpAllBuffers();

+		// point the pointer to the first one

+		dataFrameCount = 0;

+	}

 

-    public void flushFrames(IFrameWriter writer) throws HyracksDataException {

-        appender.reset(outFrame, true);

-        for (int ptr = 0; ptr < tupleCount; ++ptr) {

-            int i = tPointers[ptr * 4];

-            int tStart = tPointers[ptr * 4 + 1];

-            int tEnd = tPointers[ptr * 4 + 2];

-            ByteBuffer buffer = buffers[i];

-            fta1.reset(buffer);

-            if (!appender.append(fta1, tStart, tEnd)) {

-                FrameUtils.flushFrame(outFrame, writer);

-                appender.reset(outFrame, true);

-                if (!appender.append(fta1, tStart, tEnd)) {

-                    throw new IllegalStateException();

-                }

-            }

-        }

-        if (appender.getTupleCount() > 0) {

-            FrameUtils.flushFrame(outFrame, writer);

-        }

-    }

+	public void flushFrames(IFrameWriter writer) throws HyracksDataException {

+		appender.reset(outFrame, true);

+		for (int ptr = 0; ptr < tupleCount; ++ptr) {

+			int i = tPointers[ptr * 4];

+			int tStart = tPointers[ptr * 4 + 1];

+			int tEnd = tPointers[ptr * 4 + 2];

+			ByteBuffer buffer = buffers[i];

+			fta1.reset(buffer);

+			if (!appender.append(fta1, tStart, tEnd)) {

+				FrameUtils.flushFrame(outFrame, writer);

+				appender.reset(outFrame, true);

+				if (!appender.append(fta1, tStart, tEnd)) {

+					throw new IllegalStateException();

+				}

+			}

+		}

+		if (appender.getTupleCount() > 0) {

+			FrameUtils.flushFrame(outFrame, writer);

+		}

+	}

 

-    private void sort(int[] tPointers, int offset, int length) {

-        int m = offset + (length >> 1);

-        int mi = tPointers[m * 4];

-        int mj = tPointers[m * 4 + 1];

-        int mv = tPointers[m * 4 + 3];

+	private void sort(int[] tPointers, int offset, int length) {

+		int m = offset + (length >> 1);

+		int mi = tPointers[m * 4];

+		int mj = tPointers[m * 4 + 1];

+		int mv = tPointers[m * 4 + 3];

 

-        int a = offset;

-        int b = a;

-        int c = offset + length - 1;

-        int d = c;

-        while (true) {

-            while (b <= c) {

-                int cmp = compare(tPointers, b, mi, mj, mv);

-                if (cmp > 0) {

-                    break;

-                }

-                if (cmp == 0) {

-                    swap(tPointers, a++, b);

-                }

-                ++b;

-            }

-            while (c >= b) {

-                int cmp = compare(tPointers, c, mi, mj, mv);

-                if (cmp < 0) {

-                    break;

-                }

-                if (cmp == 0) {

-                    swap(tPointers, c, d--);

-                }

-                --c;

-            }

-            if (b > c)

-                break;

-            swap(tPointers, b++, c--);

-        }

+		int a = offset;

+		int b = a;

+		int c = offset + length - 1;

+		int d = c;

+		while (true) {

+			while (b <= c) {

+				int cmp = compare(tPointers, b, mi, mj, mv);

+				if (cmp > 0) {

+					break;

+				}

+				if (cmp == 0) {

+					swap(tPointers, a++, b);

+				}

+				++b;

+			}

+			while (c >= b) {

+				int cmp = compare(tPointers, c, mi, mj, mv);

+				if (cmp < 0) {

+					break;

+				}

+				if (cmp == 0) {

+					swap(tPointers, c, d--);

+				}

+				--c;

+			}

+			if (b > c)

+				break;

+			swap(tPointers, b++, c--);

+		}

 

-        int s;

-        int n = offset + length;

-        s = Math.min(a - offset, b - a);

-        vecswap(tPointers, offset, b - s, s);

-        s = Math.min(d - c, n - d - 1);

-        vecswap(tPointers, b, n - s, s);

+		int s;

+		int n = offset + length;

+		s = Math.min(a - offset, b - a);

+		vecswap(tPointers, offset, b - s, s);

+		s = Math.min(d - c, n - d - 1);

+		vecswap(tPointers, b, n - s, s);

 

-        if ((s = b - a) > 1) {

-            sort(tPointers, offset, s);

-        }

-        if ((s = d - c) > 1) {

-            sort(tPointers, n - s, s);

-        }

-    }

+		if ((s = b - a) > 1) {

+			sort(tPointers, offset, s);

+		}

+		if ((s = d - c) > 1) {

+			sort(tPointers, n - s, s);

+		}

+	}

 

-    private void swap(int x[], int a, int b) {

-        for (int i = 0; i < 4; ++i) {

-            int t = x[a * 4 + i];

-            x[a * 4 + i] = x[b * 4 + i];

-            x[b * 4 + i] = t;

-        }

-    }

+	private void swap(int x[], int a, int b) {

+		for (int i = 0; i < 4; ++i) {

+			int t = x[a * 4 + i];

+			x[a * 4 + i] = x[b * 4 + i];

+			x[b * 4 + i] = t;

+		}

+	}

 

-    private void vecswap(int x[], int a, int b, int n) {

-        for (int i = 0; i < n; i++, a++, b++) {

-            swap(x, a, b);

-        }

-    }

+	private void vecswap(int x[], int a, int b, int n) {

+		for (int i = 0; i < n; i++, a++, b++) {

+			swap(x, a, b);

+		}

+	}

 

-    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {

-        int i1 = tPointers[tp1 * 4];

-        int j1 = tPointers[tp1 * 4 + 1];

-        int v1 = tPointers[tp1 * 4 + 3];

-        if (v1 != tp2v) {

-            return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;

-        }

-        int i2 = tp2i;

-        int j2 = tp2j;

-        ByteBuffer buf1 = buffers[i1];

-        ByteBuffer buf2 = buffers[i2];

-        byte[] b1 = buf1.array();

-        byte[] b2 = buf2.array();

-        fta1.reset(buf1);

-        fta2.reset(buf2);

-        for (int f = 0; f < comparators.length; ++f) {

-            int fIdx = sortFields[f];

-            int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);

-            int f1End = buf1.getInt(j1 + fIdx * 4);

-            int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;

-            int l1 = f1End - f1Start;

-            int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);

-            int f2End = buf2.getInt(j2 + fIdx * 4);

-            int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;

-            int l2 = f2End - f2Start;

-            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);

-            if (c != 0) {

-                return c;

-            }

-        }

-        return 0;

-    }

+	private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {

+		int i1 = tPointers[tp1 * 4];

+		int j1 = tPointers[tp1 * 4 + 1];

+		int v1 = tPointers[tp1 * 4 + 3];

+		if (v1 != tp2v) {

+			return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1

+					: 1;

+		}

+		int i2 = tp2i;

+		int j2 = tp2j;

+		ByteBuffer buf1 = buffers[i1];

+		ByteBuffer buf2 = buffers[i2];

+		byte[] b1 = buf1.array();

+		byte[] b2 = buf2.array();

+		fta1.reset(buf1);

+		fta2.reset(buf2);

+		for (int f = 0; f < comparators.length; ++f) {

+			int fIdx = sortFields[f];

+			int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);

+			int f1End = buf1.getInt(j1 + fIdx * 4);

+			int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;

+			int l1 = f1End - f1Start;

+			int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);

+			int f2End = buf2.getInt(j2 + fIdx * 4);

+			int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;

+			int l2 = f2End - f2Start;

+			int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);

+			if (c != 0) {

+				return c;

+			}

+		}

+		return 0;

+	}

 

-    public void close() {

-        //this.buffers.clear();

-    }

+	public void close() {

+		// this.buffers.clear();

+	}

 

-    public int getFlushCount() {

-        return flushCount;

-    }

+	public int getFlushCount() {

+		return flushCount;

+	}

 

-    /*public void AddRuns(RunFileWriter r){

-    	try {

-    		runs.add(r.createReader());

-    	} catch (HyracksDataException e) {

-    		// TODO Auto-generated catch block

-    		e.printStackTrace();

-    	}

-    }*/

+	/*

+	 * public void AddRuns(RunFileWriter r){ try { runs.add(r.createReader()); }

+	 * catch (HyracksDataException e) { // TODO Auto-generated catch block

+	 * e.printStackTrace(); } }

+	 */

 

-    public List<IFrameReader> GetRuns() {

-        return runs;

-    }

+	public List<IFrameReader> GetRuns() {

+		return runs;

+	}

 

-    private void DumpAllBuffers() {

-        FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);

+	private void DumpAllBuffers() {

+		FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(),

+				recordDescriptor);

 

-        for (int i = 0; i < dataFrameCount; i++) {

-            tfa.reset(buffers[i]);

-            int t = tfa.getTupleCount();

-            for (int j = 0; j < t; j++) {

-                int tupleOffset = tfa.getTupleStartOffset(j);

+		for (int i = 0; i < dataFrameCount; i++) {

+			tfa.reset(buffers[i]);

+			int t = tfa.getTupleCount();

+			for (int j = 0; j < t; j++) {

+				int tupleOffset = tfa.getTupleStartOffset(j);

 

-                int r = tfa.getFieldStartOffset(j, 0);

-                int pos = tupleOffset + r + tfa.getFieldSlotsLength();

-                long l = buffers[i].getLong(pos);

-                System.out.print(l);

-                System.out.print(' ');

+				int r = tfa.getFieldStartOffset(j, 0);

+				int pos = tupleOffset + r + tfa.getFieldSlotsLength();

+				long l = buffers[i].getLong(pos);

+				System.out.print(l);

+				System.out.print(' ');

 

-                r = tfa.getFieldStartOffset(j, 1);

-                pos = tupleOffset + r + tfa.getFieldSlotsLength();

-                byte b = buffers[i].array()[pos];

-                System.out.print(b);

-                System.out.print(' ');

+				r = tfa.getFieldStartOffset(j, 1);

+				pos = tupleOffset + r + tfa.getFieldSlotsLength();

+				byte b = buffers[i].array()[pos];

+				System.out.print(b);

+				System.out.print(' ');

 

-                r = tfa.getFieldStartOffset(j, 2);

-                pos = tupleOffset + r + tfa.getFieldSlotsLength();

-                int o = buffers[i].getInt(pos);

-                System.out.print(o);

-                System.out.print(' ');

+				r = tfa.getFieldStartOffset(j, 2);

+				pos = tupleOffset + r + tfa.getFieldSlotsLength();

+				int o = buffers[i].getInt(pos);

+				System.out.print(o);

+				System.out.print(' ');

 

-                System.out.println();

-            }

-        }

-        System.out.println("---------------------------------");

-    }

+				System.out.println();

+			}

+		}

+		System.out.println("---------------------------------");

+	}

 

-    //functions for dubugging

-    //    private void DumpBuffer(byte[] f) {

-    //        int n = f.length;

-    //

-    //        int count = 0;

-    //        for (int i = 0; i < n; i++) {

-    //            if (i % 13 == 0) {

-    //                if (count != 0) {

-    //                    System.out.print(")(");

-    //                } else {

-    //                    System.out.print("(");

-    //                }

-    //                System.out.print(count);

-    //                System.out.print(':');

-    //                count += 1;

-    //            }

-    //            System.out.print(f[i]);

-    //            System.out.print(' ');

-    //        }

-    //        System.out.println(')');

-    //    }

+	// functions for dubugging

+	// private void DumpBuffer(byte[] f) {

+	// int n = f.length;

+	//

+	// int count = 0;

+	// for (int i = 0; i < n; i++) {

+	// if (i % 13 == 0) {

+	// if (count != 0) {

+	// System.out.print(")(");

+	// } else {

+	// System.out.print("(");

+	// }

+	// System.out.print(count);

+	// System.out.print(':');

+	// count += 1;

+	// }

+	// System.out.print(f[i]);

+	// System.out.print(' ');

+	// }

+	// System.out.println(')');

+	// }

 

-    public void processLastFrame() {

-        sortFrames();

-        FileReference file;

+	public void processLastFrame() {

+		sortFrames();

+		FileReference file;

 

-        DumpAllBuffers();

-        try {

-            file = ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());

-            RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

-            writer.open();

-            try {

-                flushCount += 1;

-                flushFrames(writer);

-            } finally {

-                writer.close();

-            }

-            runs.add(writer.createReader());

-        } catch (HyracksDataException e) {

-            // TODO Auto-generated catch block

-            e.printStackTrace();

-        }

-        //frameSorter.AddRuns((RunFileWriter) writer);

+		DumpAllBuffers();

+		try {

+			file = ctx.getJobletContext().createManagedWorkspaceFile(

+					ExternalSortRunGenerator.class.getSimpleName());

+			RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());

+			writer.open();

+			try {

+				flushCount += 1;

+				flushFrames(writer);

+			} finally {

+				writer.close();

+			}

+			runs.add(writer.createReader());

+		} catch (HyracksDataException e) {

+			// TODO Auto-generated catch block

+			e.printStackTrace();

+		}

+		// frameSorter.AddRuns((RunFileWriter) writer);

 

-    }

+	}

 }
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 18b09b9..5e00d7e 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -7,7 +7,6 @@
 import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

@@ -47,314 +46,370 @@
 

 public class Tester {

 

-    private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

-    public static final String NC1_ID = "nc1";

-    public static final String NC2_ID = "nc2";

-    public static final String NC3_ID = "nc3";

-    public static final String NC4_ID = "nc4";

+	private static final Logger LOGGER = Logger.getLogger(Tester.class

+			.getName());

+	public static final String NC1_ID = "nc1";

+	public static final String NC2_ID = "nc2";

+	public static final String NC3_ID = "nc3";

+	public static final String NC4_ID = "nc4";

 

-    private static ClusterControllerService cc;

-    private static NodeControllerService nc1;

-    private static NodeControllerService nc2;

-    private static NodeControllerService nc3;

-    private static NodeControllerService nc4;

-    private static IHyracksClientConnection hcc;

+	private static ClusterControllerService cc;

+	private static NodeControllerService nc1;

+	private static NodeControllerService nc2;

+	private static NodeControllerService nc3;

+	private static NodeControllerService nc4;

+	private static IHyracksClientConnection hcc;

 

-    //private static final boolean DEBUG = true;

+	// private static final boolean DEBUG = true;

 

-    public static void main(String[] args) throws Exception {

+	public static void main(String[] args) throws Exception {

 

-        LOGGER.setLevel(Level.OFF);

+		LOGGER.setLevel(Level.OFF);

 

-        init();

+		init();

 

-        // Options options = new Options();

+		// Options options = new Options();

 

-        IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+		IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

 

-        /*

-         * JobSpecification job =

-         * createJob(parseFileSplits(options.inFileCustomerSplits),

-         * parseFileSplits(options.inFileOrderSplits),

-         * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

-         * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

-         * options.graceFactor, options.memSize, options.tableSize,

-         * options.hasGroupBy);

-         */

+		/*

+		 * JobSpecification job =

+		 * createJob(parseFileSplits(options.inFileCustomerSplits),

+		 * parseFileSplits(options.inFileOrderSplits),

+		 * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

+		 * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

+		 * options.graceFactor, options.memSize, options.tableSize,

+		 * options.hasGroupBy);

+		 */

 

-        int k, page_num;

-        String file_name = args[0];

-        k = Integer.parseInt(args[1]);

-        page_num = Integer.parseInt(args[2]);

-        int type = Integer.parseInt(args[3]);

+		int k, page_num;

+		String file_name = args[0];

+		k = Integer.parseInt(args[1]);

+		page_num = Integer.parseInt(args[2]);

+		int type = Integer.parseInt(args[3]);

 

-        JobSpecification job = createJob(file_name, k, page_num, type);

+		JobSpecification job = createJob(file_name, k, page_num, type);

 

-        long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob("test", job);

-        hcc.waitForCompletion(jobId);

-        long end = System.currentTimeMillis();

-        System.err.println(start + " " + end + " " + (end - start));

-        

-    /*   

+		long start = System.currentTimeMillis();

+		JobId jobId = hcc.startJob("test", job);

+		hcc.waitForCompletion(jobId);

+		long end = System.currentTimeMillis();

+		System.err.println(start + " " + end + " " + (end - start));

 

-        String s = "g:\\data\\results.txt" ;

+		/*

+		 * 

+		 * String s = "g:\\data\\results.txt" ;

+		 * 

+		 * filenames = new FileOutputStream(s); // filenames = new

+		 * FileInputStream("filename.txt");

+		 * 

+		 * BufferedWriter writer = new BufferedWriter(new

+		 * OutputStreamWriter(filenames)); writer.write((int) (end-start));

+		 * writer.close();

+		 */

 

-        filenames = new FileOutputStream(s);

-        // filenames = new FileInputStream("filename.txt");

+	}

 

-        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));

-        writer.write((int) (end-start));

-        writer.close();*/

-        

-    }

+	public static void init() throws Exception {

+		CCConfig ccConfig = new CCConfig();

+		ccConfig.clientNetIpAddress = "127.0.0.1";

+		ccConfig.clientNetPort = 39000;

+		ccConfig.clusterNetIpAddress = "127.0.0.1";

+		ccConfig.clusterNetPort = 39001;

+		ccConfig.profileDumpPeriod = -1;

+		File outDir = new File("target/ClusterController");

+		outDir.mkdirs();

+		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

+				outDir);

+		ccRoot.delete();

+		ccRoot.mkdir();

+		ccConfig.ccRoot = ccRoot.getAbsolutePath();

+		cc = new ClusterControllerService(ccConfig);

+		cc.start();

+		ccConfig.defaultMaxJobAttempts = 0;

 

-    public static void init() throws Exception {

-        CCConfig ccConfig = new CCConfig();

-        ccConfig.clientNetIpAddress = "127.0.0.1";

-        ccConfig.clientNetPort = 39000;

-        ccConfig.clusterNetIpAddress = "127.0.0.1";

-        ccConfig.clusterNetPort = 39001;

-        ccConfig.profileDumpPeriod = -1;

-        File outDir = new File("target/ClusterController");

-        outDir.mkdirs();

-        File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);

-        ccRoot.delete();

-        ccRoot.mkdir();

-        ccConfig.ccRoot = ccRoot.getAbsolutePath();

-        cc = new ClusterControllerService(ccConfig);

-        cc.start();

-        ccConfig.defaultMaxJobAttempts = 0;

+		NCConfig ncConfig1 = new NCConfig();

+		ncConfig1.ccHost = "localhost";

+		ncConfig1.ccPort = 39001;

+		ncConfig1.clusterNetIPAddress = "127.0.0.1";

+		ncConfig1.dataIPAddress = "127.0.0.1";

+		ncConfig1.nodeId = NC1_ID;

+		nc1 = new NodeControllerService(ncConfig1);

+		nc1.start();

 

-        NCConfig ncConfig1 = new NCConfig();

-        ncConfig1.ccHost = "localhost";

-        ncConfig1.ccPort = 39001;

-        ncConfig1.clusterNetIPAddress = "127.0.0.1";

-        ncConfig1.dataIPAddress = "127.0.0.1";

-        ncConfig1.nodeId = NC1_ID;

-        nc1 = new NodeControllerService(ncConfig1);

-        nc1.start();

+		NCConfig ncConfig2 = new NCConfig();

+		ncConfig2.ccHost = "localhost";

+		ncConfig2.ccPort = 39001;

+		ncConfig2.clusterNetIPAddress = "127.0.0.1";

+		ncConfig2.dataIPAddress = "127.0.0.1";

+		ncConfig2.nodeId = NC2_ID;

+		nc2 = new NodeControllerService(ncConfig2);

+		nc2.start();

 

-        NCConfig ncConfig2 = new NCConfig();

-        ncConfig2.ccHost = "localhost";

-        ncConfig2.ccPort = 39001;

-        ncConfig2.clusterNetIPAddress = "127.0.0.1";

-        ncConfig2.dataIPAddress = "127.0.0.1";

-        ncConfig2.nodeId = NC2_ID;

-        nc2 = new NodeControllerService(ncConfig2);

-        nc2.start();

-        

-        NCConfig ncConfig3 = new NCConfig();

-        ncConfig3.ccHost = "localhost";

-        ncConfig3.ccPort = 39001;

-        ncConfig3.clusterNetIPAddress = "127.0.0.1";

-        ncConfig3.dataIPAddress = "127.0.0.1";

-        ncConfig3.nodeId = NC3_ID;

-        nc3 = new NodeControllerService(ncConfig3);

-        nc3.start();

-        

-        NCConfig ncConfig4 = new NCConfig();

-        ncConfig4.ccHost = "localhost";

-        ncConfig4.ccPort = 39001;

-        ncConfig4.clusterNetIPAddress = "127.0.0.1";

-        ncConfig4.dataIPAddress = "127.0.0.1";

-        ncConfig4.nodeId = NC4_ID;

-        nc4 = new NodeControllerService(ncConfig4);

-        nc4.start();

+		NCConfig ncConfig3 = new NCConfig();

+		ncConfig3.ccHost = "localhost";

+		ncConfig3.ccPort = 39001;

+		ncConfig3.clusterNetIPAddress = "127.0.0.1";

+		ncConfig3.dataIPAddress = "127.0.0.1";

+		ncConfig3.nodeId = NC3_ID;

+		nc3 = new NodeControllerService(ncConfig3);

+		nc3.start();

 

-        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

-        hcc.createApplication("test", null);

-        if (LOGGER.isLoggable(Level.INFO)) {

-            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

-        }

-    }

+		NCConfig ncConfig4 = new NCConfig();

+		ncConfig4.ccHost = "localhost";

+		ncConfig4.ccPort = 39001;

+		ncConfig4.clusterNetIPAddress = "127.0.0.1";

+		ncConfig4.dataIPAddress = "127.0.0.1";

+		ncConfig4.nodeId = NC4_ID;

+		nc4 = new NodeControllerService(ncConfig4);

+		nc4.start();

 

-    private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

-        JobSpecification spec = new JobSpecification();

+		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

+				ccConfig.clientNetPort);

+		hcc.createApplication("test", null);

+		if (LOGGER.isLoggable(Level.INFO)) {

+			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+		}

+	}

 

         //spec.setFrameSize(32768);

         spec.setFrameSize(32768);

 

-        FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

+		// spec.setFrameSize(32768);

+		spec.setFrameSize(64);

 

-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});

-                //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                //ByteSerializerDeserializer.INSTANCE });

+		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+		// NC1_ID);

 

        int[] keyFields = new int[] { 0 };

         int frameLimits = 4096;		// hyracks oriented

         //int tableSize = 10485767;	// hyracks oriented

         int tableSize = 2351137;	// hyracks oriented

 

-        AbstractOperatorDescriptor single_grouper;

-        IConnectorDescriptor conn_partition;

-        AbstractOperatorDescriptor cross_grouper;

+		int[] keyFields = new int[] { 0 };

+		int frameLimits = 4096; // hyracks oriented

+		int tableSize = 10485767; // hyracks oriented

 

-        

-        if(0 == type){//external group by

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+		AbstractOperatorDescriptor single_grouper;

+		IConnectorDescriptor conn_partition;

+		AbstractOperatorDescriptor cross_grouper;

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-        

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+		if (0 == type) {// external group by

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-        }

-        else if( 1 == type){

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(VLongPointable.FACTORY) }), tableSize), true);

-            conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

-                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );

-            cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },  

-                    new DistributedMergeLmerAggregateFactory(), 

-                    outputRec);

-        }

-        else{

-            long inputSizeInRawRecords = 154000000;

-            long inputSizeInUniqueKeys = 38500000;

-            int recordSizeInBytes = 4;

-            int hashfuncStartLevel = 1;

-            single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

-                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-                    hashfuncStartLevel, 

-                    new VLongNormalizedKeyComputerFactory(),

-                    new MergeKmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            recordSizeInBytes = 13;

-            cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

-                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-                    hashfuncStartLevel, 

-                    new VLongNormalizedKeyComputerFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);            

-        }

-        

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        

-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(readfileConn, scan, 0, single_grouper, 0);

-        

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

 

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			cross_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+		} else if (1 == type) {

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(VLongPointable.FACTORY) }),

+							tableSize), true);

+			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

+					spec,

+					new KmerHashPartitioncomputerFactory(),

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) });

+			cross_grouper = new PreclusteredGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new DistributedMergeLmerAggregateFactory(), outputRec);

+		} else {

+			long inputSizeInRawRecords = 154000000;

+			long inputSizeInUniqueKeys = 38500000;

+			int recordSizeInBytes = 4;

+			int hashfuncStartLevel = 1;

+			single_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			recordSizeInBytes = 13;

+			cross_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(VLongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

+					// new IBinaryHashFunctionFamily[]

+					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+					hashfuncStartLevel,

+					new VLongNormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+		}

 

-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(printConn, cross_grouper, 0, printer, 0);

-        //spec.connect(readfileConn, scan, 0, printer, 0);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// single_grouper, NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

 

-        spec.addRoot(printer);

+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

+				spec);

+		spec.connect(readfileConn, scan, 0, single_grouper, 0);

 

-        if( 1 == type ){

-            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

-        }

-        // System.out.println(spec.toString());

-        return spec;

-    }

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// cross_grouper,NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

+		// PrinterOperatorDescriptor printer = new

+		// PrinterOperatorDescriptor(spec, "G:\\data\\result");

+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// printer, NC1_ID);

 

-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {

-        private static final long serialVersionUID = 1L;

+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+		spec.connect(printConn, cross_grouper, 0, printer, 0);

+		// spec.connect(readfileConn, scan, 0, printer, 0);

 

-        private final IBinaryComparatorFactory bFactory;

-        private final int pos0;

-        private final int pos1;

+		spec.addRoot(printer);

 

-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {

-            this.bFactory = bFactory;

-            this.pos0 = pos0;

-            this.pos1 = pos1;

-        }

+		if (1 == type) {

+			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+		}

+		// System.out.println(spec.toString());

+		return spec;

+	}

 

-        @Override

-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {

-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);

-        }

-    }

+	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+		private static final long serialVersionUID = 1L;

 

-    static class JoinComparator implements ITuplePairComparator {

+		private final IBinaryComparatorFactory bFactory;

+		private final int pos0;

+		private final int pos1;

 

-        private final IBinaryComparator bComparator;

-        private final int field0;

-        private final int field1;

+		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

+				int pos0, int pos1) {

+			this.bFactory = bFactory;

+			this.pos0 = pos0;

+			this.pos1 = pos1;

+		}

 

-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {

-            this.bComparator = bComparator;

-            this.field0 = field0;

-            this.field1 = field1;

-        }

+		@Override

+		public ITuplePairComparator createTuplePairComparator(

+				IHyracksTaskContext ctx) {

+			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

+					pos1);

+		}

+	}

 

-        @Override

-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {

-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);

-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+	static class JoinComparator implements ITuplePairComparator {

 

-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);

-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+		private final IBinaryComparator bComparator;

+		private final int field0;

+		private final int field1;

 

-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

-            int fLen0 = fEnd0 - fStart0;

+		public JoinComparator(IBinaryComparator bComparator, int field0,

+				int field1) {

+			this.bComparator = bComparator;

+			this.field0 = field0;

+			this.field1 = field1;

+		}

 

-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

-            int fLen1 = fEnd1 - fStart1;

+		@Override

+		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

+				IFrameTupleAccessor accessor1, int tIndex1) {

+			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

 

-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1

-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);

-            if (c != 0) {

-                return c;

-            }

-            return 0;

-        }

-    }

+			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+

+			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+			int fLen0 = fEnd0 - fStart0;

+

+			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+			int fLen1 = fEnd1 - fStart1;

+

+			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

+					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

+					fStart1 + fStartOffset1, fLen1);

+			if (c != 0) {

+				return c;

+			}

+			return 0;

+		}

+	}

 

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 792d033..c715dbd 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -10,160 +10,180 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-    private static final int max = 255;

+public class DistributedMergeLmerAggregateFactory implements

+		IAggregatorDescriptorFactory {

+	private static final long serialVersionUID = 1L;

+	private static final int max = 255;

 

-    public DistributedMergeLmerAggregateFactory() {

-    }

+	public DistributedMergeLmerAggregateFactory() {

+	}

 

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new IAggregatorDescriptor() {

+	@Override

+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

+			RecordDescriptor inRecordDescriptor,

+			RecordDescriptor outRecordDescriptor, int[] keyFields,

+			int[] keyFieldsInPartialResults) throws HyracksDataException {

+		return new IAggregatorDescriptor() {

 

-            @Override

-            public void reset() {

-            }

+			@Override

+			public void reset() {

+			}

 

-            @Override

-            public void close() {

-                // TODO Auto-generated method stub

+			@Override

+			public void close() {

+				// TODO Auto-generated method stub

 

-            }

+			}

 

-            @Override

-            public AggregateState createAggregateStates() {

-                // TODO Auto-generated method stub

-                return new AggregateState(new Object() {

-                });

-            }

+			@Override

+			public AggregateState createAggregateStates() {

+				// TODO Auto-generated method stub

+				return new AggregateState(new Object() {

+				});

+			}

 

-            @Override

-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                byte bitmap = 0;

-                byte count = 0;

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),

-                        tupleOffset + accessor.getFieldSlotsLength() + fieldStart);

+			@Override

+			public void init(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = 0;

+				byte count = 0;

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+				bitmap |= ByteSerializerDeserializer.getByte(accessor

+						.getBuffer().array(),

+						tupleOffset + accessor.getFieldSlotsLength()

+								+ fieldStart);

 

-                tupleOffset = accessor.getTupleStartOffset(tIndex);

-                fieldStart = accessor.getFieldStartOffset(tIndex, 2);

-                int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

+				tupleOffset = accessor.getTupleStartOffset(tIndex);

+				fieldStart = accessor.getFieldStartOffset(tIndex, 2);

+				int offset = tupleOffset + fieldStart

+						+ accessor.getFieldSlotsLength();

 

-                count += ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+				count += ByteSerializerDeserializer.getByte(accessor

+						.getBuffer().array(), offset);

 

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when initializing the aggregator.");

-                }

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when initializing the aggregator.");

+				}

 

-            }

+			}

 

-            @Override

-            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

-                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-               

-            	byte bitmap = 0;

-                byte count = 0;

+			@Override

+			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

+					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

 

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;

-                bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

+				byte bitmap = 0;

+				byte count = 0;

 

-                tupleOffset = accessor.getTupleStartOffset(tIndex);

-                fieldStart = accessor.getFieldStartOffset(tIndex, 2);

-                offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();

-                count = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);

-                

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

+				int offset = tupleOffset + accessor.getFieldSlotsLength()

+						+ fieldStart;

+				bitmap |= ByteSerializerDeserializer.getByte(accessor

+						.getBuffer().array(), offset);

 

-                int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

-                int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

-                int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;

+				tupleOffset = accessor.getTupleStartOffset(tIndex);

+				fieldStart = accessor.getFieldStartOffset(tIndex, 2);

+				offset = tupleOffset + fieldStart

+						+ accessor.getFieldSlotsLength();

+				count = ByteSerializerDeserializer.getByte(accessor.getBuffer()

+						.array(), offset);

 

-                byte[] data = stateAccessor.getBuffer().array();

+				int statetupleOffset = stateAccessor

+						.getTupleStartOffset(stateTupleIndex);

+				int statefieldStart = stateAccessor.getFieldStartOffset(

+						stateTupleIndex, 1);

+				int stateoffset = statetupleOffset

+						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

 

-                ByteBuffer buf = ByteBuffer.wrap(data);

-                bitmap |= buf.getChar(stateoffset);

-                buf.position(stateoffset+1);

-                count += buf.get();

-                

-                if(count > max){

-                	count = (byte) max;

-                }

-                

-                buf.put(stateoffset, bitmap);

-                buf.put(stateoffset + 1, count);

-            }

+				byte[] data = stateAccessor.getBuffer().array();

 

-            @Override

-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-                byte bitmap;

-                byte count;

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                byte[] data = accessor.getBuffer().array();

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+				ByteBuffer buf = ByteBuffer.wrap(data);

+				bitmap |= buf.getChar(stateoffset);

+				buf.position(stateoffset + 1);

+				count += buf.get();

 

-                int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

-                bitmap = ByteSerializerDeserializer.getByte(data, offset);

+				if (count > max) {

+					count = (byte) max;

+				}

 

-                count = ByteSerializerDeserializer.getByte(data, offset + 1);

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-                }

+				buf.put(stateoffset, bitmap);

+				buf.put(stateoffset + 1, count);

+			}

 

-            }

+			@Override

+			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

+				byte bitmap;

+				byte count;

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				byte[] data = accessor.getBuffer().array();

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

 

-            @Override

-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-                byte bitmap;

-                byte count;

+				int offset = fieldOffset + accessor.getFieldSlotsLength()

+						+ tupleOffset;

+				bitmap = ByteSerializerDeserializer.getByte(data, offset);

 

-                byte[] data = accessor.getBuffer().array();

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

-                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

+				count = ByteSerializerDeserializer.getByte(data, offset + 1);

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

 

-                bitmap = ByteSerializerDeserializer.getByte(data, offset);

-                count = ByteSerializerDeserializer.getByte(data, offset + 1);

+			}

 

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-                }

-            }

+			@Override

+			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

+				byte bitmap;

+				byte count;

 

-        };

-    }

+				byte[] data = accessor.getBuffer().array();

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+				int offset = tupleOffset + accessor.getFieldSlotsLength()

+						+ fieldOffset;

+

+				bitmap = ByteSerializerDeserializer.getByte(data, offset);

+				count = ByteSerializerDeserializer.getByte(data, offset + 1);

+

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

+			}

+

+		};

+	}

 

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 1182cb1..b096887 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -10,153 +10,166 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 

 public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {

-    private static final long serialVersionUID = 1L;

-    private static final int max =  255;

+	private static final long serialVersionUID = 1L;

+	private static final int max = 255;

 

-    public MergeKmerAggregateFactory() {

-    }

+	public MergeKmerAggregateFactory() {

+	}

 

-    @Override

-    public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,

-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)

-            throws HyracksDataException {

-        return new IAggregatorDescriptor() {

+	@Override

+	public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,

+			RecordDescriptor inRecordDescriptor,

+			RecordDescriptor outRecordDescriptor, int[] keyFields,

+			int[] keyFieldsInPartialResults) throws HyracksDataException {

+		return new IAggregatorDescriptor() {

 

-            @Override

-            public void reset() {

-            }

+			@Override

+			public void reset() {

+			}

 

-            @Override

-            public void close() {

-                // TODO Auto-generated method stub

+			@Override

+			public void close() {

+				// TODO Auto-generated method stub

 

-            }

+			}

 

-            @Override

-            public AggregateState createAggregateStates() {

-                // TODO Auto-generated method stub

-                return new AggregateState(new Object() {

-                });

-            }

+			@Override

+			public AggregateState createAggregateStates() {

+				// TODO Auto-generated method stub

+				return new AggregateState(new Object() {

+				});

+			}

 

-            @Override

-            public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                byte bitmap = 0;

-                byte count = 0;

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                                

-                bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength() 

-                                                       + fieldStart);

-                

-               	count += 1;

-               	

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when initializing the aggregator.");

-                }

+			@Override

+			public void init(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				byte bitmap = 0;

+				byte count = 0;

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

 

-            }

+				bitmap |= accessor.getBuffer().get(

+						tupleOffset + accessor.getFieldSlotsLength()

+								+ fieldStart);

 

-            @Override

-            public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,

-                    int stateTupleIndex, AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-                byte bitmap = 0;

-                byte count = 0;

+				count += 1;

 

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

-                        

-                bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength() 

-                                                       + fieldStart);

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when initializing the aggregator.");

+				}

 

-                int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);

-                int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);

-                int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;

-                

-                

-                count += 1;

+			}

 

-                byte[] data = stateAccessor.getBuffer().array();

+			@Override

+			public void aggregate(IFrameTupleAccessor accessor, int tIndex,

+					IFrameTupleAccessor stateAccessor, int stateTupleIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

+				byte bitmap = 0;

+				byte count = 0;

 

-                ByteBuffer buf = ByteBuffer.wrap(data);

-                bitmap |= buf.getChar(stateoffset);

-                buf.position(stateoffset+1);

-                count +=  buf.get();

-                

-                if( count > max){

-                	count = (byte)max;

-                }

-                

-                buf.put(stateoffset, bitmap);

-                buf.put(stateoffset + 1, count);

-            }

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldStart = accessor.getFieldStartOffset(tIndex, 1);

 

-            @Override

-            public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-                byte bitmap;

-                byte count;

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                byte[] data = accessor.getBuffer().array();

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+				bitmap |= accessor.getBuffer().get(

+						tupleOffset + accessor.getFieldSlotsLength()

+								+ fieldStart);

 

-                int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;

-                bitmap = ByteSerializerDeserializer.getByte(data, offset);

-                count = ByteSerializerDeserializer.getByte(data, offset + 1);

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-                }

+				int statetupleOffset = stateAccessor

+						.getTupleStartOffset(stateTupleIndex);

+				int statefieldStart = stateAccessor.getFieldStartOffset(

+						stateTupleIndex, 1);

+				int stateoffset = statetupleOffset

+						+ stateAccessor.getFieldSlotsLength() + statefieldStart;

 

-            }

+				count += 1;

 

-            @Override

-            public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,

-                    AggregateState state) throws HyracksDataException {

-                // TODO Auto-generated method stub

-                byte bitmap;

-                byte count;

+				byte[] data = stateAccessor.getBuffer().array();

 

-                byte[] data = accessor.getBuffer().array();

-                int tupleOffset = accessor.getTupleStartOffset(tIndex);

-                int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

-                int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;

+				ByteBuffer buf = ByteBuffer.wrap(data);

+				bitmap |= buf.getChar(stateoffset);

+				buf.position(stateoffset + 1);

+				count += buf.get();

 

-                bitmap = ByteSerializerDeserializer.getByte(data, offset);

-                count = ByteSerializerDeserializer.getByte(data, offset + 1);

+				if (count > max) {

+					count = (byte) max;

+				}

 

-                DataOutput fieldOutput = tupleBuilder.getDataOutput();

-                try {

-                    fieldOutput.writeByte(bitmap);

-                    tupleBuilder.addFieldEndOffset();

-                    fieldOutput.writeByte(count);

-                    tupleBuilder.addFieldEndOffset();

-                } catch (IOException e) {

-                    throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");

-                }

-            }

+				buf.put(stateoffset, bitmap);

+				buf.put(stateoffset + 1, count);

+			}

 

-        };

-    }

+			@Override

+			public void outputPartialResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

+				byte bitmap;

+				byte count;

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				byte[] data = accessor.getBuffer().array();

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+

+				int offset = fieldOffset + accessor.getFieldSlotsLength()

+						+ tupleOffset;

+				bitmap = ByteSerializerDeserializer.getByte(data, offset);

+				count = ByteSerializerDeserializer.getByte(data, offset + 1);

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

+

+			}

+

+			@Override

+			public void outputFinalResult(ArrayTupleBuilder tupleBuilder,

+					IFrameTupleAccessor accessor, int tIndex,

+					AggregateState state) throws HyracksDataException {

+				// TODO Auto-generated method stub

+				byte bitmap;

+				byte count;

+

+				byte[] data = accessor.getBuffer().array();

+				int tupleOffset = accessor.getTupleStartOffset(tIndex);

+				int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);

+				int offset = tupleOffset + accessor.getFieldSlotsLength()

+						+ fieldOffset;

+

+				bitmap = ByteSerializerDeserializer.getByte(data, offset);

+				count = ByteSerializerDeserializer.getByte(data, offset + 1);

+

+				DataOutput fieldOutput = tupleBuilder.getDataOutput();

+				try {

+					fieldOutput.writeByte(bitmap);

+					tupleBuilder.addFieldEndOffset();

+					fieldOutput.writeByte(count);

+					tupleBuilder.addFieldEndOffset();

+				} catch (IOException e) {

+					throw new HyracksDataException(

+							"I/O exception when writing aggregation to the output buffer.");

+				}

+			}

+

+		};

+	}

 

 }

diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 6d0585e..927cd36 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,17 +1,12 @@
 package edu.uci.ics.genomix.driver;
 
-import java.io.IOException;
 import java.net.URL;
 import java.util.EnumSet;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.genomix.job.GenomixJob;
 import edu.uci.ics.genomix.job.JobGen;
@@ -25,109 +20,122 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class Driver {
-   public static enum Plan {
-        BUILD_DEBRUJIN_GRAPH,
-        GRAPH_CLEANNING,
-        CONTIGS_GENERATION,
-    }
-   
-   	private static final String IS_PROFILING = "genomix.driver.profiling";
-   	private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
-   	private static final String applicationName = GenomixJob.JOB_NAME;
-   	private static final Log LOG = LogFactory.getLog(Driver.class);
-    private JobGen jobGen;
-    private boolean profiling;
-    
-    private int numPartitionPerMachine;
-    
-    private IHyracksClientConnection hcc;
-    
-    public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException{
-    	try{
-    		hcc = new HyracksConnection(ipAddress, port);
-    	} catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    	this.numPartitionPerMachine = numPartitionPerMachine;
-    }
-	
-    public void runJob(GenomixJob job) throws HyracksException {
-        runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
-    }
+	public static enum Plan {
+		BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
+	}
 
-    public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
-            throws HyracksException {
-        /** add hadoop configurations */
-    	//TODO need to include the hadoophome to the classpath in the way below
-        URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
-        job.getConfiguration().addResource(hadoopCore);
-        URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
-        job.getConfiguration().addResource(hadoopMapRed);
-        URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
-        job.getConfiguration().addResource(hadoopHdfs);
+	private static final String IS_PROFILING = "genomix.driver.profiling";
+	private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+	private static final String applicationName = GenomixJob.JOB_NAME;
+	private static final Log LOG = LogFactory.getLog(Driver.class);
+	private JobGen jobGen;
+	private boolean profiling;
 
-        LOG.info("job started");
-        long start = System.currentTimeMillis();
-        long end = start;
-        long time = 0;
+	private int numPartitionPerMachine;
 
-        this.profiling = profiling;
-        try {
-    		Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
-            switch (planChoice) {
-                case BUILD_DEBRUJIN_GRAPH:default:
-                    jobGen = new JobGenBrujinGraph(job, ncMap, numPartitionPerMachine);
-                    break;
-                case GRAPH_CLEANNING:
-                    jobGen = new JobGenGraphCleanning(job);
-                    break;
-                case CONTIGS_GENERATION:
-                    jobGen = new JobGenContigsGeneration(job);
-                    break;
-            }
-            
-            start = System.currentTimeMillis();
-            runCreate(jobGen);
-            end = System.currentTimeMillis();
-            time = end - start;
-            LOG.info("result writing finished " + time + "ms");
-            LOG.info("job finished");
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-    
-    private void runCreate(JobGen jobGen) throws Exception {
-        try {
-            JobSpecification createJob = jobGen.generateJob();
-            execute(createJob);
-        } catch (Exception e) {
-            throw e;
-        }
-    }
+	private IHyracksClientConnection hcc;
+	private Scheduler scheduler;
 
-    private void execute(JobSpecification job) throws Exception {
-        job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc.startJob(applicationName, job,
-                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.waitForCompletion(jobId);
-    }
-    
-    public static void main(String [] args) throws Exception{
-    	GenomixJob job = new GenomixJob();
-    	String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
-    	if ( otherArgs.length < 2){
-    		System.err.println("Need <serverIP> <port>");
-    		System.exit(-1);
-    	}
-    	String ipAddress = otherArgs[0];
-    	int port = Integer.parseInt(otherArgs[1]);
-    	int numOfDuplicate = job.getConfiguration().getInt(CPARTITION_PER_MACHINE, 2);
-    	boolean bProfiling = job.getConfiguration().getBoolean(IS_PROFILING, true);
-    	
-    	Driver driver = new Driver(ipAddress, port, numOfDuplicate);
-    	driver.runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
-    }
+	public Driver(String ipAddress, int port, int numPartitionPerMachine)
+			throws HyracksException {
+		try {
+			hcc = new HyracksConnection(ipAddress, port);
+			scheduler = new Scheduler(ipAddress, port);
+		} catch (Exception e) {
+			throw new HyracksException(e);
+		}
+		this.numPartitionPerMachine = numPartitionPerMachine;
+	}
+
+	public void runJob(GenomixJob job) throws HyracksException {
+		runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+	}
+
+	public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
+			throws HyracksException {
+		/** add hadoop configurations */
+		URL hadoopCore = job.getClass().getClassLoader()
+				.getResource("core-site.xml");
+		job.getConfiguration().addResource(hadoopCore);
+		URL hadoopMapRed = job.getClass().getClassLoader()
+				.getResource("mapred-site.xml");
+		job.getConfiguration().addResource(hadoopMapRed);
+		URL hadoopHdfs = job.getClass().getClassLoader()
+				.getResource("hdfs-site.xml");
+		job.getConfiguration().addResource(hadoopHdfs);
+
+		LOG.info("job started");
+		long start = System.currentTimeMillis();
+		long end = start;
+		long time = 0;
+
+		this.profiling = profiling;
+		try {
+			Map<String, NodeControllerInfo> ncMap = hcc
+					.getNodeControllerInfos();
+			switch (planChoice) {
+			case BUILD_DEBRUJIN_GRAPH:
+			default:
+				jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
+						numPartitionPerMachine);
+				break;
+			case GRAPH_CLEANNING:
+				jobGen = new JobGenGraphCleanning(job);
+				break;
+			case CONTIGS_GENERATION:
+				jobGen = new JobGenContigsGeneration(job);
+				break;
+			}
+
+			start = System.currentTimeMillis();
+			runCreate(jobGen);
+			end = System.currentTimeMillis();
+			time = end - start;
+			LOG.info("result writing finished " + time + "ms");
+			LOG.info("job finished");
+		} catch (Exception e) {
+			throw new HyracksException(e);
+		}
+	}
+
+	private void runCreate(JobGen jobGen) throws Exception {
+		try {
+			JobSpecification createJob = jobGen.generateJob();
+			execute(createJob);
+		} catch (Exception e) {
+			throw e;
+		}
+	}
+
+	private void execute(JobSpecification job) throws Exception {
+		job.setUseConnectorPolicyForScheduling(false);
+		JobId jobId = hcc.startJob(
+				applicationName,
+				job,
+				profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+						.noneOf(JobFlag.class));
+		hcc.waitForCompletion(jobId);
+	}
+
+	public static void main(String[] args) throws Exception {
+		GenomixJob job = new GenomixJob();
+		String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
+				args).getRemainingArgs();
+		if (otherArgs.length < 2) {
+			System.err.println("Need <serverIP> <port>");
+			System.exit(-1);
+		}
+		String ipAddress = otherArgs[0];
+		int port = Integer.parseInt(otherArgs[1]);
+		int numOfDuplicate = job.getConfiguration().getInt(
+				CPARTITION_PER_MACHINE, 2);
+		boolean bProfiling = job.getConfiguration().getBoolean(IS_PROFILING,
+				true);
+
+		Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+		driver.runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+	}
 }
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 67527a7..bdc8202 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -6,7 +6,7 @@
 import org.apache.hadoop.mapreduce.Job;
 
 public class GenomixJob extends Job {
-	
+
 	public static final String JOB_NAME = "genomix";
 
 	/** Kmers length */
@@ -19,7 +19,6 @@
 	public static final String TABLE_SIZE = "genomix.tablesize";
 	/** Groupby types ? */
 	public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
-	
 
 	/** Configurations used by hybrid groupby function in graph build phrase */
 	public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 66bf79d..6933541 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -4,11 +4,11 @@
 
 import org.apache.hadoop.conf.Configuration;
 
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
 public abstract class JobGen {
-	
+
 	protected final Configuration conf;
 	protected final GenomixJob genomixJob;
 	protected String jobId = new UUID(System.currentTimeMillis(),
@@ -22,6 +22,6 @@
 
 	protected abstract void initJobConfiguration();
 
-	public abstract JobSpecification generateJob () throws HyracksDataException;
+	public abstract JobSpecification generateJob() throws HyracksException;
 
 }
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 067ad37..a1b28f7 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -3,6 +3,8 @@
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -11,8 +13,9 @@
 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
 import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
 import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.FileScanDescriptor;
+import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
 import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
+import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
 import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -24,11 +27,11 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -39,6 +42,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 public class JobGenBrujinGraph extends JobGen {
 	public enum GroupbyType {
@@ -46,8 +52,9 @@
 	}
 
 	private final Map<String, NodeControllerInfo> ncMap;
-	private String [] ncNodeNames;
-	
+	private Scheduler scheduler;
+	private String[] ncNodeNames;
+
 	private int kmers;
 	private int frameLimits;
 	private int tableSize;
@@ -60,20 +67,24 @@
 	private AbstractOperatorDescriptor crossGrouper;
 	private RecordDescriptor outputRec;
 
-	public JobGenBrujinGraph(GenomixJob job,
-			final Map<String, NodeControllerInfo> ncMap, int numPartitionPerMachine) {
+	public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
+			final Map<String, NodeControllerInfo> ncMap,
+			int numPartitionPerMachine) {
 		super(job);
 		this.ncMap = ncMap;
-		String [] nodes = new String[ncMap.size()];
+		this.scheduler = scheduler;
+		String[] nodes = new String[ncMap.size()];
 		ncMap.keySet().toArray(nodes);
 		ncNodeNames = new String[nodes.length * numPartitionPerMachine];
-		for (int i = 0; i < numPartitionPerMachine; i++){
-			System.arraycopy(nodes, 0, ncNodeNames, i*nodes.length, nodes.length);
+		for (int i = 0; i < numPartitionPerMachine; i++) {
+			System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
+					nodes.length);
 		}
 	}
 
 	private ExternalGroupOperatorDescriptor newExternalGroupby(
-			JobSpecification jobSpec, int[] keyFields, IAggregatorDescriptorFactory aggeragater) {
+			JobSpecification jobSpec, int[] keyFields,
+			IAggregatorDescriptorFactory aggeragater) {
 		return new ExternalGroupOperatorDescriptor(
 				jobSpec,
 				keyFields,
@@ -120,19 +131,18 @@
 			throws HyracksDataException {
 		int[] keyFields = new int[] { 0 }; // the id of grouped key
 
-		outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-				Integer64SerializerDeserializer.INSTANCE,
-				ByteSerializerDeserializer.INSTANCE,
-				ByteSerializerDeserializer.INSTANCE });
 		switch (groupbyType) {
 		case EXTERNAL:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+			singleGrouper = newExternalGroupby(jobSpec, keyFields,
+					new MergeKmerAggregateFactory());
 			connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
 					new KmerHashPartitioncomputerFactory());
-			crossGrouper = newExternalGroupby(jobSpec, keyFields,new DistributedMergeLmerAggregateFactory());
+			crossGrouper = newExternalGroupby(jobSpec, keyFields,
+					new DistributedMergeLmerAggregateFactory());
 			break;
 		case PRECLUSTER:
-			singleGrouper = newExternalGroupby(jobSpec, keyFields,new MergeKmerAggregateFactory());
+			singleGrouper = newExternalGroupby(jobSpec, keyFields,
+					new MergeKmerAggregateFactory());
 			connPartition = new MToNPartitioningMergingConnectorDescriptor(
 					jobSpec,
 					new KmerHashPartitioncomputerFactory(),
@@ -172,34 +182,58 @@
 		}
 	}
 
+	public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
+			throws HyracksDataException {
+		try {
+			outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+					null, ByteSerializerDeserializer.INSTANCE,
+					ByteSerializerDeserializer.INSTANCE });
+
+			InputSplit[] splits = ((JobConf) conf).getInputFormat().getSplits(
+					(JobConf) conf, ncNodeNames.length);
+
+			String[] readSchedule = scheduler.getLocationConstraints(splits);
+			return new HDFSReadOperatorDescriptor(jobSpec, outputRec,
+					(JobConf) conf, splits, readSchedule,
+					new ReadsKeyValueParserFactory(kmers));
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
 	@Override
-	public JobSpecification generateJob() throws HyracksDataException {
-		
+	public JobSpecification generateJob() throws HyracksException {
+
 		JobSpecification jobSpec = new JobSpecification();
-		//File input
-		FileScanDescriptor scan = new FileScanDescriptor(jobSpec, kmers, inputPaths);		
-		
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, scan,ncNodeNames);
-		
+		// File input
+		HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				readOperator, ncNodeNames);
+
 		generateDescriptorbyType(jobSpec);
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				singleGrouper, ncNodeNames);
 
 		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
 				jobSpec);
-		jobSpec.connect(readfileConn, scan, 0, singleGrouper, 0);
+		jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
 
 		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
 				crossGrouper, ncNodeNames);
 		jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
 
-		//Output
-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(jobSpec,
-				outputPath.getName());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, printer,
-				ncNodeNames);
+		// Output
+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(
+				jobSpec, outputPath.getName());
+		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
+				jobSpec, (JobConf) conf, new KMerWriterFactory());
 
-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				printer, ncNodeNames);
+
+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
+				jobSpec);
 		jobSpec.connect(printConn, crossGrouper, 0, printer, 0);
 		jobSpec.addRoot(printer);
 
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
index d221834..6d30fad 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -17,7 +17,7 @@
 	@Override
 	protected void initJobConfiguration() {
 		// TODO Auto-generated method stub
-		
+
 	}
 
 }
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
index 6b7d98e..43b5a97 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -17,7 +17,7 @@
 	@Override
 	protected void initJobConfiguration() {
 		// TODO Auto-generated method stub
-		
+
 	}
 
 }
diff --git a/genomix/genomix-core/src/main/resources/conf/cluster.properties b/genomix/genomix-core/src/main/resources/conf/cluster.properties
index 5b2b757..77abcf5 100644
--- a/genomix/genomix-core/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-core/src/main/resources/conf/cluster.properties
@@ -25,6 +25,9 @@
 #The JAVA_HOME
 JAVA_HOME=$JAVA_HOME
 
+#HADOOP_HOME 
+CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
+
 #The frame size of the internal dataflow engine
 FRAME_SIZE=65536
 
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index 5de25ba..8aad1b9 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -5,6 +5,7 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
@@ -12,102 +13,201 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.junit.Test;
+import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.genomix.driver.Driver;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextTupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
 
-public class JobRunTestCase extends TestCase{
-    private static final String ACTUAL_RESULT_DIR = "actual";
-    private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
-    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+public class JobRunTestCase extends TestCase {
+	private static final String ACTUAL_RESULT_DIR = "actual";
+	private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
+	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
 
-    private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
-    private static final String HDFS_INPUT_PATH = "/customer/";
-    private static final String HDFS_OUTPUT_PATH = "/customer_result/";
+	private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
+	private static final String HDFS_INPUT_PATH = "/customer/";
+	private static final String HDFS_OUTPUT_PATH = "/customer_result/";
 
-    private static final String HYRACKS_APP_NAME = "genomix";
-    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
-    private MiniDFSCluster dfsCluster;
+	private static final String HYRACKS_APP_NAME = "genomix";
+	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+			+ File.separator + "conf.xml";
+	private MiniDFSCluster dfsCluster;
 
-    private JobConf conf = new JobConf();
-    private int numberOfNC = 2;
-    private int numPartitionPerMachine=2;
-    
-    private Driver myDriver;
+	private JobConf conf = new JobConf();
+	private int numberOfNC = 2;
+	private int numPartitionPerMachine = 2;
+
+	private Driver myDriver;
+
 	@Override
 	protected void setUp() throws Exception {
-        cleanupStores();
-        HyracksUtils.init();
-        HyracksUtils.createApp(HYRACKS_APP_NAME);
-        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-        startHDFS();
-        
-        myDriver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
-    }
+		cleanupStores();
+		HyracksUtils.init();
+		HyracksUtils.createApp(HYRACKS_APP_NAME);
+		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+		startHDFS();
 
-    private void cleanupStores() throws IOException {
-        FileUtils.forceMkdir(new File("teststore"));
-        FileUtils.forceMkdir(new File("build"));
-        FileUtils.cleanDirectory(new File("teststore"));
-        FileUtils.cleanDirectory(new File("build"));
-    }
+		myDriver = new Driver(HyracksUtils.CC_HOST,
+				HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+				numPartitionPerMachine);
+	}
 
-    private void startHDFS() throws IOException {
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+	private void cleanupStores() throws IOException {
+		FileUtils.forceMkdir(new File("teststore"));
+		FileUtils.forceMkdir(new File("build"));
+		FileUtils.cleanDirectory(new File("teststore"));
+		FileUtils.cleanDirectory(new File("build"));
+	}
 
-        FileSystem lfs = FileSystem.getLocal(new Configuration());
-        lfs.delete(new Path("build"), true);
-        System.setProperty("hadoop.log.dir", "logs");
-        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-        FileSystem dfs = FileSystem.get(conf);
-        Path src = new Path(DATA_PATH);
-        Path dest = new Path(HDFS_INPUT_PATH);
-        Path result = new Path(HDFS_OUTPUT_PATH);
-        dfs.mkdirs(dest);
-        dfs.mkdirs(result);
-        dfs.copyFromLocalFile(src, dest);
+	private void startHDFS() throws IOException {
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
 
-        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
-        conf.writeXml(confOutput);
-        confOutput.flush();
-        confOutput.close();
-    }
+		FileSystem lfs = FileSystem.getLocal(new Configuration());
+		lfs.delete(new Path("build"), true);
+		System.setProperty("hadoop.log.dir", "logs");
+		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+		FileSystem dfs = FileSystem.get(conf);
+		Path src = new Path(DATA_PATH);
+		Path dest = new Path(HDFS_INPUT_PATH);
+		Path result = new Path(HDFS_OUTPUT_PATH);
+		dfs.mkdirs(dest);
+		dfs.mkdirs(result);
+		dfs.copyFromLocalFile(src, dest);
 
-    
+		DataOutputStream confOutput = new DataOutputStream(
+				new FileOutputStream(new File(HADOOP_CONF_PATH)));
+		conf.writeXml(confOutput);
+		confOutput.flush();
+		confOutput.close();
+	}
+
 	@Override
 	protected void runTest() throws Throwable {
 		TestExternalGroupby();
 		TestPreClusterGroupby();
 		TestHybridGroupby();
 	}
-	
-	void TestExternalGroupby() throws Exception{
-		// TODO
+
+	public void runHdfsJob() throws Throwable {
+
+		FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+		FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+		conf.setInputFormat(TextInputFormat.class);
+
+		Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST,
+				HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+		InputSplit[] splits = conf.getInputFormat().getSplits(conf,
+				numberOfNC * 4);
+
+		String[] readSchedule = scheduler.getLocationConstraints(splits);
+		JobSpecification jobSpec = new JobSpecification();
+		RecordDescriptor recordDesc = new RecordDescriptor(
+				new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+		String[] locations = new String[] { HyracksUtils.NC1_ID,
+				HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
+		HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(
+				jobSpec, recordDesc, conf, splits, readSchedule,
+				new TextKeyValueParserFactory());
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				readOperator, locations);
+
+		ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(
+				jobSpec,
+				10,
+				new int[] { 0 },
+				new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE },
+				recordDesc);
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				sortOperator, locations);
+
+		HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
+				jobSpec, conf, new TextTupleWriterFactory());
+		PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+				writeOperator, HyracksUtils.NC1_ID);
+
+		jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator,
+				0, sortOperator, 0);
+		jobSpec.connect(
+				new MToNPartitioningMergingConnectorDescriptor(
+						jobSpec,
+						new FieldHashPartitionComputerFactory(
+								new int[] { 0 },
+								new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
+						new int[] { 0 },
+						new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }),
+				sortOperator, 0, writeOperator, 0);
+		jobSpec.addRoot(writeOperator);
+
+		IHyracksClientConnection client = new HyracksConnection(
+				HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+		JobId jobId = client.startJob(HYRACKS_APP_NAME, jobSpec);
+		client.waitForCompletion(jobId);
+
+		Assert.assertEquals(true, checkResults());
 	}
-	
-	void TestPreClusterGroupby() throws Exception{
-		// TODO
+
+	void TestExternalGroupby() throws Exception {
 	}
-	
-	void TestHybridGroupby() throws Exception{
+
+	void TestPreClusterGroupby() throws Exception {
 		// TODO
 	}
 
-	
+	void TestHybridGroupby() throws Exception {
+		// TODO
+	}
+
+	private boolean checkResults() throws Exception {
+		FileSystem dfs = FileSystem.get(conf);
+		Path result = new Path(HDFS_OUTPUT_PATH);
+		Path actual = new Path(ACTUAL_RESULT_DIR);
+		dfs.copyToLocalFile(result, actual);
+
+		TestUtils.compareWithResult(new File(EXPECTED_RESULT_PATH
+				+ File.separator + "part-0"), new File(ACTUAL_RESULT_DIR
+				+ File.separator + "customer_result" + File.separator
+				+ "part-0"));
+		return true;
+	}
+
 	@Override
 	protected void tearDown() throws Exception {
-        HyracksUtils.destroyApp(HYRACKS_APP_NAME);
-        HyracksUtils.deinit();
-        cleanupHDFS();
+		HyracksUtils.destroyApp(HYRACKS_APP_NAME);
+		HyracksUtils.deinit();
+		cleanupHDFS();
 	}
-	
-    private void cleanupHDFS() throws Exception {
-        dfsCluster.shutdown();
-    }
-	
+
+	private void cleanupHDFS() throws Exception {
+		dfsCluster.shutdown();
+	}
+
 }
diff --git a/genomix/genomix-core/src/test/resources/data/customer.tbl b/genomix/genomix-core/src/test/resources/data/customer.tbl
new file mode 100644
index 0000000..5d39c80
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/data/customer.tbl
@@ -0,0 +1,150 @@
+1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e|
+2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref|
+3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov|
+4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou|
+5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor|
+6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious|
+7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|18|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp|
+8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|17|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide|
+9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|8|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl|
+10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|5|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur|
+11|Customer#000000011|PkWS 3HlXqwTuzrKg633BEi|23|33-464-151-3439|-272.60|BUILDING|ckages. requests sleep slyly. quickly even pinto beans promise above the slyly regular pinto beans. |
+12|Customer#000000012|9PWKuhzT4Zr1Q|13|23-791-276-1263|3396.49|HOUSEHOLD| to the carefully final braids. blithely regular requests nag. ironic theodolites boost quickly along|
+13|Customer#000000013|nsXQu0oVjD7PM659uC3SRSp|3|13-761-547-5974|3857.34|BUILDING|ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely|
+14|Customer#000000014|KXkletMlL2JQEA |1|11-845-129-3851|5266.30|FURNITURE|, ironic packages across the unus|
+15|Customer#000000015|YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn|23|33-687-542-7601|2788.52|HOUSEHOLD| platelets. regular deposits detect asymptotes. blithely unusual packages nag slyly at the fluf|
+16|Customer#000000016|cYiaeMLZSMAOQ2 d0W,|10|20-781-609-3107|4681.03|FURNITURE|kly silent courts. thinly regular theodolites sleep fluffily after |
+17|Customer#000000017|izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7|2|12-970-682-3487|6.34|AUTOMOBILE|packages wake! blithely even pint|
+18|Customer#000000018|3txGO AiuFux3zT0Z9NYaFRnZt|6|16-155-215-1315|5494.43|BUILDING|s sleep. carefully even instructions nag furiously alongside of t|
+19|Customer#000000019|uc,3bHIx84H,wdrmLOjVsiqXCq2tr|18|28-396-526-5053|8914.71|HOUSEHOLD| nag. furiously careful packages are slyly at the accounts. furiously regular in|
+20|Customer#000000020|JrPk8Pqplj4Ne|22|32-957-234-8742|7603.40|FURNITURE|g alongside of the special excuses-- fluffily enticing packages wake |
+21|Customer#000000021|XYmVpr9yAHDEn|8|18-902-614-8344|1428.25|MACHINERY| quickly final accounts integrate blithely furiously u|
+22|Customer#000000022|QI6p41,FNs5k7RZoCCVPUTkUdYpB|3|13-806-545-9701|591.98|MACHINERY|s nod furiously above the furiously ironic ideas. |
+23|Customer#000000023|OdY W13N7Be3OC5MpgfmcYss0Wn6TKT|3|13-312-472-8245|3332.02|HOUSEHOLD|deposits. special deposits cajole slyly. fluffily special deposits about the furiously |
+24|Customer#000000024|HXAFgIAyjxtdqwimt13Y3OZO 4xeLe7U8PqG|13|23-127-851-8031|9255.67|MACHINERY|into beans. fluffily final ideas haggle fluffily|
+25|Customer#000000025|Hp8GyFQgGHFYSilH5tBfe|12|22-603-468-3533|7133.70|FURNITURE|y. accounts sleep ruthlessly according to the regular theodolites. unusual instructions sleep. ironic, final|
+26|Customer#000000026|8ljrc5ZeMl7UciP|22|32-363-455-4837|5182.05|AUTOMOBILE|c requests use furiously ironic requests. slyly ironic dependencies us|
+27|Customer#000000027|IS8GIyxpBrLpMT0u7|3|13-137-193-2709|5679.84|BUILDING| about the carefully ironic pinto beans. accoun|
+28|Customer#000000028|iVyg0daQ,Tha8x2WPWA9m2529m|8|18-774-241-1462|1007.18|FURNITURE| along the regular deposits. furiously final pac|
+29|Customer#000000029|sJ5adtfyAkCK63df2,vF25zyQMVYE34uh|0|10-773-203-7342|7618.27|FURNITURE|its after the carefully final platelets x-ray against |
+30|Customer#000000030|nJDsELGAavU63Jl0c5NKsKfL8rIJQQkQnYL2QJY|1|11-764-165-5076|9321.01|BUILDING|lithely final requests. furiously unusual account|
+31|Customer#000000031|LUACbO0viaAv6eXOAebryDB xjVst|23|33-197-837-7094|5236.89|HOUSEHOLD|s use among the blithely pending depo|
+32|Customer#000000032|jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J|15|25-430-914-2194|3471.53|BUILDING|cial ideas. final, furious requests across the e|
+33|Customer#000000033|qFSlMuLucBmx9xnn5ib2csWUweg D|17|27-375-391-1280|-78.56|AUTOMOBILE|s. slyly regular accounts are furiously. carefully pending requests|
+34|Customer#000000034|Q6G9wZ6dnczmtOx509xgE,M2KV|15|25-344-968-5422|8589.70|HOUSEHOLD|nder against the even, pending accounts. even|
+35|Customer#000000035|TEjWGE4nBzJL2|17|27-566-888-7431|1228.24|HOUSEHOLD|requests. special, express requests nag slyly furiousl|
+36|Customer#000000036|3TvCzjuPzpJ0,DdJ8kW5U|21|31-704-669-5769|4987.27|BUILDING|haggle. enticing, quiet platelets grow quickly bold sheaves. carefully regular acc|
+37|Customer#000000037|7EV4Pwh,3SboctTWt|8|18-385-235-7162|-917.75|FURNITURE|ilent packages are carefully among the deposits. furiousl|
+38|Customer#000000038|a5Ee5e9568R8RLP 2ap7|12|22-306-880-7212|6345.11|HOUSEHOLD|lar excuses. closely even asymptotes cajole blithely excuses. carefully silent pinto beans sleep carefully fin|
+39|Customer#000000039|nnbRg,Pvy33dfkorYE FdeZ60|2|12-387-467-6509|6264.31|AUTOMOBILE|tions. slyly silent excuses slee|
+40|Customer#000000040|gOnGWAyhSV1ofv|3|13-652-915-8939|1335.30|BUILDING|rges impress after the slyly ironic courts. foxes are. blithely |
+41|Customer#000000041|IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF|10|20-917-711-4011|270.95|HOUSEHOLD|ly regular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo|
+42|Customer#000000042|ziSrvyyBke|5|15-416-330-4175|8727.01|BUILDING|ssly according to the pinto beans: carefully special requests across the even, pending accounts wake special|
+43|Customer#000000043|ouSbjHk8lh5fKX3zGso3ZSIj9Aa3PoaFd|19|29-316-665-2897|9904.28|MACHINERY|ial requests: carefully pending foxes detect quickly. carefully final courts cajole quickly. carefully|
+44|Customer#000000044|Oi,dOSPwDu4jo4x,,P85E0dmhZGvNtBwi|16|26-190-260-5375|7315.94|AUTOMOBILE|r requests around the unusual, bold a|
+45|Customer#000000045|4v3OcpFgoOmMG,CbnF,4mdC|9|19-715-298-9917|9983.38|AUTOMOBILE|nto beans haggle slyly alongside of t|
+46|Customer#000000046|eaTXWWm10L9|6|16-357-681-2007|5744.59|AUTOMOBILE|ctions. accounts sleep furiously even requests. regular, regular accounts cajole blithely around the final pa|
+47|Customer#000000047|b0UgocSqEW5 gdVbhNT|2|12-427-271-9466|274.58|BUILDING|ions. express, ironic instructions sleep furiously ironic ideas. furi|
+48|Customer#000000048|0UU iPhBupFvemNB|0|10-508-348-5882|3792.50|BUILDING|re fluffily pending foxes. pending, bold platelets sleep slyly. even platelets cajo|
+49|Customer#000000049|cNgAeX7Fqrdf7HQN9EwjUa4nxT,68L FKAxzl|10|20-908-631-4424|4573.94|FURNITURE|nusual foxes! fluffily pending packages maintain to the regular |
+50|Customer#000000050|9SzDYlkzxByyJ1QeTI o|6|16-658-112-3221|4266.13|MACHINERY|ts. furiously ironic accounts cajole furiously slyly ironic dinos.|
+51|Customer#000000051|uR,wEaiTvo4|12|22-344-885-4251|855.87|FURNITURE|eposits. furiously regular requests integrate carefully packages. furious|
+52|Customer#000000052|7 QOqGqqSy9jfV51BC71jcHJSD0|11|21-186-284-5998|5630.28|HOUSEHOLD|ic platelets use evenly even accounts. stealthy theodolites cajole furiou|
+53|Customer#000000053|HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib|15|25-168-852-5363|4113.64|HOUSEHOLD|ar accounts are. even foxes are blithely. fluffily pending deposits boost|
+54|Customer#000000054|,k4vf 5vECGWFy,hosTE,|4|14-776-370-4745|868.90|AUTOMOBILE|sual, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi|
+55|Customer#000000055|zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom|10|20-180-440-8525|4572.11|MACHINERY|ully unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab|
+56|Customer#000000056|BJYZYJQk4yD5B|10|20-895-685-6920|6530.86|FURNITURE|. notornis wake carefully. carefully fluffy requests are furiously even accounts. slyly expre|
+57|Customer#000000057|97XYbsuOPRXPWU|21|31-835-306-1650|4151.93|AUTOMOBILE|ove the carefully special packages. even, unusual deposits sleep slyly pend|
+58|Customer#000000058|g9ap7Dk1Sv9fcXEWjpMYpBZIRUohi T|13|23-244-493-2508|6478.46|HOUSEHOLD|ideas. ironic ideas affix furiously express, final instructions. regular excuses use quickly e|
+59|Customer#000000059|zLOCP0wh92OtBihgspOGl4|1|11-355-584-3112|3458.60|MACHINERY|ously final packages haggle blithely after the express deposits. furiou|
+60|Customer#000000060|FyodhjwMChsZmUz7Jz0H|12|22-480-575-5866|2741.87|MACHINERY|latelets. blithely unusual courts boost furiously about the packages. blithely final instruct|
+61|Customer#000000061|9kndve4EAJxhg3veF BfXr7AqOsT39o gtqjaYE|17|27-626-559-8599|1536.24|FURNITURE|egular packages shall have to impress along the |
+62|Customer#000000062|upJK2Dnw13,|7|17-361-978-7059|595.61|MACHINERY|kly special dolphins. pinto beans are slyly. quickly regular accounts are furiously a|
+63|Customer#000000063|IXRSpVWWZraKII|21|31-952-552-9584|9331.13|AUTOMOBILE|ithely even accounts detect slyly above the fluffily ir|
+64|Customer#000000064|MbCeGY20kaKK3oalJD,OT|3|13-558-731-7204|-646.64|BUILDING|structions after the quietly ironic theodolites cajole be|
+65|Customer#000000065|RGT yzQ0y4l0H90P783LG4U95bXQFDRXbWa1sl,X|23|33-733-623-5267|8795.16|AUTOMOBILE|y final foxes serve carefully. theodolites are carefully. pending i|
+66|Customer#000000066|XbsEqXH1ETbJYYtA1A|22|32-213-373-5094|242.77|HOUSEHOLD|le slyly accounts. carefully silent packages benea|
+67|Customer#000000067|rfG0cOgtr5W8 xILkwp9fpCS8|9|19-403-114-4356|8166.59|MACHINERY|indle furiously final, even theodo|
+68|Customer#000000068|o8AibcCRkXvQFh8hF,7o|12|22-918-832-2411|6853.37|HOUSEHOLD| pending pinto beans impress realms. final dependencies |
+69|Customer#000000069|Ltx17nO9Wwhtdbe9QZVxNgP98V7xW97uvSH1prEw|9|19-225-978-5670|1709.28|HOUSEHOLD|thely final ideas around the quickly final dependencies affix carefully quickly final theodolites. final accounts c|
+70|Customer#000000070|mFowIuhnHjp2GjCiYYavkW kUwOjIaTCQ|22|32-828-107-2832|4867.52|FURNITURE|fter the special asymptotes. ideas after the unusual frets cajole quickly regular pinto be|
+71|Customer#000000071|TlGalgdXWBmMV,6agLyWYDyIz9MKzcY8gl,w6t1B|7|17-710-812-5403|-611.19|HOUSEHOLD|g courts across the regular, final pinto beans are blithely pending ac|
+72|Customer#000000072|putjlmskxE,zs,HqeIA9Wqu7dhgH5BVCwDwHHcf|2|12-759-144-9689|-362.86|FURNITURE|ithely final foxes sleep always quickly bold accounts. final wat|
+73|Customer#000000073|8IhIxreu4Ug6tt5mog4|0|10-473-439-3214|4288.50|BUILDING|usual, unusual packages sleep busily along the furiou|
+74|Customer#000000074|IkJHCA3ZThF7qL7VKcrU nRLl,kylf |4|14-199-862-7209|2764.43|MACHINERY|onic accounts. blithely slow packages would haggle carefully. qui|
+75|Customer#000000075|Dh 6jZ,cwxWLKQfRKkiGrzv6pm|18|28-247-803-9025|6684.10|AUTOMOBILE| instructions cajole even, even deposits. finally bold deposits use above the even pains. slyl|
+76|Customer#000000076|m3sbCvjMOHyaOofH,e UkGPtqc4|0|10-349-718-3044|5745.33|FURNITURE|pecial deposits. ironic ideas boost blithely according to the closely ironic theodolites! furiously final deposits n|
+77|Customer#000000077|4tAE5KdMFGD4byHtXF92vx|17|27-269-357-4674|1738.87|BUILDING|uffily silent requests. carefully ironic asymptotes among the ironic hockey players are carefully bli|
+78|Customer#000000078|HBOta,ZNqpg3U2cSL0kbrftkPwzX|9|19-960-700-9191|7136.97|FURNITURE|ests. blithely bold pinto beans h|
+79|Customer#000000079|n5hH2ftkVRwW8idtD,BmM2|15|25-147-850-4166|5121.28|MACHINERY|es. packages haggle furiously. regular, special requests poach after the quickly express ideas. blithely pending re|
+80|Customer#000000080|K,vtXp8qYB |0|10-267-172-7101|7383.53|FURNITURE|tect among the dependencies. bold accounts engage closely even pinto beans. ca|
+81|Customer#000000081|SH6lPA7JiiNC6dNTrR|20|30-165-277-3269|2023.71|BUILDING|r packages. fluffily ironic requests cajole fluffily. ironically regular theodolit|
+82|Customer#000000082|zhG3EZbap4c992Gj3bK,3Ne,Xn|18|28-159-442-5305|9468.34|AUTOMOBILE|s wake. bravely regular accounts are furiously. regula|
+83|Customer#000000083|HnhTNB5xpnSF20JBH4Ycs6psVnkC3RDf|22|32-817-154-4122|6463.51|BUILDING|ccording to the quickly bold warhorses. final, regular foxes integrate carefully. bold packages nag blithely ev|
+84|Customer#000000084|lpXz6Fwr9945rnbtMc8PlueilS1WmASr CB|11|21-546-818-3802|5174.71|FURNITURE|ly blithe foxes. special asymptotes haggle blithely against the furiously regular depo|
+85|Customer#000000085|siRerlDwiolhYR 8FgksoezycLj|5|15-745-585-8219|3386.64|FURNITURE|ronic ideas use above the slowly pendin|
+86|Customer#000000086|US6EGGHXbTTXPL9SBsxQJsuvy|0|10-677-951-2353|3306.32|HOUSEHOLD|quests. pending dugouts are carefully aroun|
+87|Customer#000000087|hgGhHVSWQl 6jZ6Ev|23|33-869-884-7053|6327.54|FURNITURE|hely ironic requests integrate according to the ironic accounts. slyly regular pla|
+88|Customer#000000088|wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg|16|26-516-273-2566|8031.44|AUTOMOBILE|s are quickly above the quickly ironic instructions; even requests about the carefully final deposi|
+89|Customer#000000089|dtR, y9JQWUO6FoJExyp8whOU|14|24-394-451-5404|1530.76|FURNITURE|counts are slyly beyond the slyly final accounts. quickly final ideas wake. r|
+90|Customer#000000090|QxCzH7VxxYUWwfL7|16|26-603-491-1238|7354.23|BUILDING|sly across the furiously even |
+91|Customer#000000091|S8OMYFrpHwoNHaGBeuS6E 6zhHGZiprw1b7 q|8|18-239-400-3677|4643.14|AUTOMOBILE|onic accounts. fluffily silent pinto beans boost blithely according to the fluffily exp|
+92|Customer#000000092|obP PULk2LH LqNF,K9hcbNqnLAkJVsl5xqSrY,|2|12-446-416-8471|1182.91|MACHINERY|. pinto beans hang slyly final deposits. ac|
+93|Customer#000000093|EHXBr2QGdh|7|17-359-388-5266|2182.52|MACHINERY|press deposits. carefully regular platelets r|
+94|Customer#000000094|IfVNIN9KtkScJ9dUjK3Pg5gY1aFeaXewwf|9|19-953-499-8833|5500.11|HOUSEHOLD|latelets across the bold, final requests sleep according to the fluffily bold accounts. unusual deposits amon|
+95|Customer#000000095|EU0xvmWvOmUUn5J,2z85DQyG7QCJ9Xq7|15|25-923-255-2929|5327.38|MACHINERY|ithely. ruthlessly final requests wake slyly alongside of the furiously silent pinto beans. even the|
+96|Customer#000000096|vWLOrmXhRR|8|18-422-845-1202|6323.92|AUTOMOBILE|press requests believe furiously. carefully final instructions snooze carefully. |
+97|Customer#000000097|OApyejbhJG,0Iw3j rd1M|17|27-588-919-5638|2164.48|AUTOMOBILE|haggle slyly. bold, special ideas are blithely above the thinly bold theo|
+98|Customer#000000098|7yiheXNSpuEAwbswDW|12|22-885-845-6889|-551.37|BUILDING|ages. furiously pending accounts are quickly carefully final foxes: busily pe|
+99|Customer#000000099|szsrOiPtCHVS97Lt|15|25-515-237-9232|4088.65|HOUSEHOLD|cajole slyly about the regular theodolites! furiously bold requests nag along the pending, regular packages. somas|
+100|Customer#000000100|fptUABXcmkC5Wx|20|30-749-445-4907|9889.89|FURNITURE|was furiously fluffily quiet deposits. silent, pending requests boost against |
+101|Customer#000000101|sMmL2rNeHDltovSm Y|2|12-514-298-3699|7470.96|MACHINERY| sleep. pending packages detect slyly ironic pack|
+102|Customer#000000102|UAtflJ06 fn9zBfKjInkQZlWtqaA|19|29-324-978-8538|8462.17|BUILDING|ously regular dependencies nag among the furiously express dinos. blithely final|
+103|Customer#000000103|8KIsQX4LJ7QMsj6DrtFtXu0nUEdV,8a|9|19-216-107-2107|2757.45|BUILDING|furiously pending notornis boost slyly around the blithely ironic ideas? final, even instructions cajole fl|
+104|Customer#000000104|9mcCK L7rt0SwiYtrbO88DiZS7U d7M|10|20-966-284-8065|-588.38|FURNITURE|rate carefully slyly special pla|
+105|Customer#000000105|4iSJe4L SPjg7kJj98Yz3z0B|10|20-793-553-6417|9091.82|MACHINERY|l pains cajole even accounts. quietly final instructi|
+106|Customer#000000106|xGCOEAUjUNG|1|11-751-989-4627|3288.42|MACHINERY|lose slyly. ironic accounts along the evenly regular theodolites wake about the special, final gifts. |
+107|Customer#000000107|Zwg64UZ,q7GRqo3zm7P1tZIRshBDz|15|25-336-529-9919|2514.15|AUTOMOBILE|counts cajole slyly. regular requests wake. furiously regular deposits about the blithely final fo|
+108|Customer#000000108|GPoeEvpKo1|5|15-908-619-7526|2259.38|BUILDING|refully ironic deposits sleep. regular, unusual requests wake slyly|
+109|Customer#000000109|OOOkYBgCMzgMQXUmkocoLb56rfrdWp2NE2c|16|26-992-422-8153|-716.10|BUILDING|es. fluffily final dependencies sleep along the blithely even pinto beans. final deposits haggle furiously furiou|
+110|Customer#000000110|mymPfgphaYXNYtk|10|20-893-536-2069|7462.99|AUTOMOBILE|nto beans cajole around the even, final deposits. quickly bold packages according to the furiously regular dept|
+111|Customer#000000111|CBSbPyOWRorloj2TBvrK9qp9tHBs|22|32-582-283-7528|6505.26|MACHINERY|ly unusual instructions detect fluffily special deposits-- theodolites nag carefully during the ironic dependencies|
+112|Customer#000000112|RcfgG3bO7QeCnfjqJT1|19|29-233-262-8382|2953.35|FURNITURE|rmanently unusual multipliers. blithely ruthless deposits are furiously along the|
+113|Customer#000000113|eaOl5UBXIvdY57rglaIzqvfPD,MYfK|12|22-302-930-4756|2912.00|BUILDING|usly regular theodolites boost furiously doggedly pending instructio|
+114|Customer#000000114|xAt 5f5AlFIU|14|24-805-212-7646|1027.46|FURNITURE|der the carefully express theodolites are after the packages. packages are. bli|
+115|Customer#000000115|0WFt1IXENmUT2BgbsB0ShVKJZt0HCBCbFl0aHc|8|18-971-699-1843|7508.92|HOUSEHOLD|sits haggle above the carefully ironic theodolite|
+116|Customer#000000116|yCuVxIgsZ3,qyK2rloThy3u|16|26-632-309-5792|8403.99|BUILDING|as. quickly final sauternes haggle slyly carefully even packages. brave, ironic pinto beans are above the furious|
+117|Customer#000000117|uNhM,PzsRA3S,5Y Ge5Npuhi|24|34-403-631-3505|3950.83|FURNITURE|affix. instructions are furiously sl|
+118|Customer#000000118|OVnFuHygK9wx3xpg8|18|28-639-943-7051|3582.37|AUTOMOBILE|uick packages alongside of the furiously final deposits haggle above the fluffily even foxes. blithely dogged dep|
+119|Customer#000000119|M1ETOIecuvH8DtM0Y0nryXfW|7|17-697-919-8406|3930.35|FURNITURE|express ideas. blithely ironic foxes thrash. special acco|
+120|Customer#000000120|zBNna00AEInqyO1|12|22-291-534-1571|363.75|MACHINERY| quickly. slyly ironic requests cajole blithely furiously final dependen|
+121|Customer#000000121|tv nCR2YKupGN73mQudO|17|27-411-990-2959|6428.32|BUILDING|uriously stealthy ideas. carefully final courts use carefully|
+122|Customer#000000122|yp5slqoNd26lAENZW3a67wSfXA6hTF|3|13-702-694-4520|7865.46|HOUSEHOLD| the special packages hinder blithely around the permanent requests. bold depos|
+123|Customer#000000123|YsOnaaER8MkvK5cpf4VSlq|5|15-817-151-1168|5897.83|BUILDING|ependencies. regular, ironic requests are fluffily regu|
+124|Customer#000000124|aTbyVAW5tCd,v09O|18|28-183-750-7809|1842.49|AUTOMOBILE|le fluffily even dependencies. quietly s|
+125|Customer#000000125|,wSZXdVR xxIIfm9s8ITyLl3kgjT6UC07GY0Y|19|29-261-996-3120|-234.12|FURNITURE|x-ray finally after the packages? regular requests c|
+126|Customer#000000126|ha4EHmbx3kg DYCsP6DFeUOmavtQlHhcfaqr|22|32-755-914-7592|1001.39|HOUSEHOLD|s about the even instructions boost carefully furiously ironic pearls. ruthless, |
+127|Customer#000000127|Xyge4DX2rXKxXyye1Z47LeLVEYMLf4Bfcj|21|31-101-672-2951|9280.71|MACHINERY|ic, unusual theodolites nod silently after the final, ironic instructions: pending r|
+128|Customer#000000128|AmKUMlJf2NRHcKGmKjLS|4|14-280-874-8044|-986.96|HOUSEHOLD|ing packages integrate across the slyly unusual dugouts. blithely silent ideas sublate carefully. blithely expr|
+129|Customer#000000129|q7m7rbMM0BpaCdmxloCgBDRCleXsXkdD8kf|7|17-415-148-7416|9127.27|HOUSEHOLD| unusual deposits boost carefully furiously silent ideas. pending accounts cajole slyly across|
+130|Customer#000000130|RKPx2OfZy0Vn 8wGWZ7F2EAvmMORl1k8iH|9|19-190-993-9281|5073.58|HOUSEHOLD|ix slowly. express packages along the furiously ironic requests integrate daringly deposits. fur|
+131|Customer#000000131|jyN6lAjb1FtH10rMC,XzlWyCBrg75|11|21-840-210-3572|8595.53|HOUSEHOLD|jole special packages. furiously final dependencies about the furiously speci|
+132|Customer#000000132|QM5YabAsTLp9|4|14-692-150-9717|162.57|HOUSEHOLD|uickly carefully special theodolites. carefully regular requests against the blithely unusual instructions |
+133|Customer#000000133|IMCuXdpIvdkYO92kgDGuyHgojcUs88p|17|27-408-997-8430|2314.67|AUTOMOBILE|t packages. express pinto beans are blithely along the unusual, even theodolites. silent packages use fu|
+134|Customer#000000134|sUiZ78QCkTQPICKpA9OBzkUp2FM|11|21-200-159-5932|4608.90|BUILDING|yly fluffy foxes boost final ideas. b|
+135|Customer#000000135|oZK,oC0 fdEpqUML|19|29-399-293-6241|8732.91|FURNITURE| the slyly final accounts. deposits cajole carefully. carefully sly packag|
+136|Customer#000000136|QoLsJ0v5C1IQbh,DS1|7|17-501-210-4726|-842.39|FURNITURE|ackages sleep ironic, final courts. even requests above the blithely bold requests g|
+137|Customer#000000137|cdW91p92rlAEHgJafqYyxf1Q|16|26-777-409-5654|7838.30|HOUSEHOLD|carefully regular theodolites use. silent dolphins cajo|
+138|Customer#000000138|5uyLAeY7HIGZqtu66Yn08f|5|15-394-860-4589|430.59|MACHINERY|ts doze on the busy ideas. regular|
+139|Customer#000000139|3ElvBwudHKL02732YexGVFVt |9|19-140-352-1403|7897.78|MACHINERY|nstructions. quickly ironic ideas are carefully. bold, |
+140|Customer#000000140|XRqEPiKgcETII,iOLDZp5jA|4|14-273-885-6505|9963.15|MACHINERY|ies detect slyly ironic accounts. slyly ironic theodolites hag|
+141|Customer#000000141|5IW,WROVnikc3l7DwiUDGQNGsLBGOL6Dc0|1|11-936-295-6204|6706.14|FURNITURE|packages nag furiously. carefully unusual accounts snooze according to the fluffily regular pinto beans. slyly spec|
+142|Customer#000000142|AnJ5lxtLjioClr2khl9pb8NLxG2,|9|19-407-425-2584|2209.81|AUTOMOBILE|. even, express theodolites upo|
+143|Customer#000000143|681r22uL452zqk 8By7I9o9enQfx0|16|26-314-406-7725|2186.50|MACHINERY|across the blithely unusual requests haggle theodo|
+144|Customer#000000144|VxYZ3ebhgbltnetaGjNC8qCccjYU05 fePLOno8y|1|11-717-379-4478|6417.31|MACHINERY|ges. slyly regular accounts are slyly. bold, idle reque|
+145|Customer#000000145|kQjHmt2kcec cy3hfMh969u|13|23-562-444-8454|9748.93|HOUSEHOLD|ests? express, express instructions use. blithely fina|
+146|Customer#000000146|GdxkdXG9u7iyI1,,y5tq4ZyrcEy|3|13-835-723-3223|3328.68|FURNITURE|ffily regular dinos are slyly unusual requests. slyly specia|
+147|Customer#000000147|6VvIwbVdmcsMzuu,C84GtBWPaipGfi7DV|18|28-803-187-4335|8071.40|AUTOMOBILE|ress packages above the blithely regular packages sleep fluffily blithely ironic accounts. |
+148|Customer#000000148|BhSPlEWGvIJyT9swk vCWE|11|21-562-498-6636|2135.60|HOUSEHOLD|ing to the carefully ironic requests. carefully regular dependencies about the theodolites wake furious|
+149|Customer#000000149|3byTHCp2mNLPigUrrq|19|29-797-439-6760|8959.65|AUTOMOBILE|al instructions haggle against the slyly bold w|
+150|Customer#000000150|zeoGShTjCwGPplOWFkLURrh41O0AZ8dwNEEN4 |18|28-328-564-7630|3849.48|MACHINERY|ole blithely among the furiously pending packages. furiously bold ideas wake fluffily ironic idea|
diff --git a/genomix/genomix-core/src/test/resources/expected/part-0 b/genomix/genomix-core/src/test/resources/expected/part-0
new file mode 100755
index 0000000..ce3b00c
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/expected/part-0
@@ -0,0 +1,150 @@
+100|Customer#000000100|fptUABXcmkC5Wx|20|30-749-445-4907|9889.89|FURNITURE|was furiously fluffily quiet deposits. silent, pending requests boost against |
+101|Customer#000000101|sMmL2rNeHDltovSm Y|2|12-514-298-3699|7470.96|MACHINERY| sleep. pending packages detect slyly ironic pack|
+102|Customer#000000102|UAtflJ06 fn9zBfKjInkQZlWtqaA|19|29-324-978-8538|8462.17|BUILDING|ously regular dependencies nag among the furiously express dinos. blithely final|
+103|Customer#000000103|8KIsQX4LJ7QMsj6DrtFtXu0nUEdV,8a|9|19-216-107-2107|2757.45|BUILDING|furiously pending notornis boost slyly around the blithely ironic ideas? final, even instructions cajole fl|
+104|Customer#000000104|9mcCK L7rt0SwiYtrbO88DiZS7U d7M|10|20-966-284-8065|-588.38|FURNITURE|rate carefully slyly special pla|
+105|Customer#000000105|4iSJe4L SPjg7kJj98Yz3z0B|10|20-793-553-6417|9091.82|MACHINERY|l pains cajole even accounts. quietly final instructi|
+106|Customer#000000106|xGCOEAUjUNG|1|11-751-989-4627|3288.42|MACHINERY|lose slyly. ironic accounts along the evenly regular theodolites wake about the special, final gifts. |
+107|Customer#000000107|Zwg64UZ,q7GRqo3zm7P1tZIRshBDz|15|25-336-529-9919|2514.15|AUTOMOBILE|counts cajole slyly. regular requests wake. furiously regular deposits about the blithely final fo|
+108|Customer#000000108|GPoeEvpKo1|5|15-908-619-7526|2259.38|BUILDING|refully ironic deposits sleep. regular, unusual requests wake slyly|
+109|Customer#000000109|OOOkYBgCMzgMQXUmkocoLb56rfrdWp2NE2c|16|26-992-422-8153|-716.10|BUILDING|es. fluffily final dependencies sleep along the blithely even pinto beans. final deposits haggle furiously furiou|
+10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|5|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur|
+110|Customer#000000110|mymPfgphaYXNYtk|10|20-893-536-2069|7462.99|AUTOMOBILE|nto beans cajole around the even, final deposits. quickly bold packages according to the furiously regular dept|
+111|Customer#000000111|CBSbPyOWRorloj2TBvrK9qp9tHBs|22|32-582-283-7528|6505.26|MACHINERY|ly unusual instructions detect fluffily special deposits-- theodolites nag carefully during the ironic dependencies|
+112|Customer#000000112|RcfgG3bO7QeCnfjqJT1|19|29-233-262-8382|2953.35|FURNITURE|rmanently unusual multipliers. blithely ruthless deposits are furiously along the|
+113|Customer#000000113|eaOl5UBXIvdY57rglaIzqvfPD,MYfK|12|22-302-930-4756|2912.00|BUILDING|usly regular theodolites boost furiously doggedly pending instructio|
+114|Customer#000000114|xAt 5f5AlFIU|14|24-805-212-7646|1027.46|FURNITURE|der the carefully express theodolites are after the packages. packages are. bli|
+115|Customer#000000115|0WFt1IXENmUT2BgbsB0ShVKJZt0HCBCbFl0aHc|8|18-971-699-1843|7508.92|HOUSEHOLD|sits haggle above the carefully ironic theodolite|
+116|Customer#000000116|yCuVxIgsZ3,qyK2rloThy3u|16|26-632-309-5792|8403.99|BUILDING|as. quickly final sauternes haggle slyly carefully even packages. brave, ironic pinto beans are above the furious|
+117|Customer#000000117|uNhM,PzsRA3S,5Y Ge5Npuhi|24|34-403-631-3505|3950.83|FURNITURE|affix. instructions are furiously sl|
+118|Customer#000000118|OVnFuHygK9wx3xpg8|18|28-639-943-7051|3582.37|AUTOMOBILE|uick packages alongside of the furiously final deposits haggle above the fluffily even foxes. blithely dogged dep|
+119|Customer#000000119|M1ETOIecuvH8DtM0Y0nryXfW|7|17-697-919-8406|3930.35|FURNITURE|express ideas. blithely ironic foxes thrash. special acco|
+11|Customer#000000011|PkWS 3HlXqwTuzrKg633BEi|23|33-464-151-3439|-272.60|BUILDING|ckages. requests sleep slyly. quickly even pinto beans promise above the slyly regular pinto beans. |
+120|Customer#000000120|zBNna00AEInqyO1|12|22-291-534-1571|363.75|MACHINERY| quickly. slyly ironic requests cajole blithely furiously final dependen|
+121|Customer#000000121|tv nCR2YKupGN73mQudO|17|27-411-990-2959|6428.32|BUILDING|uriously stealthy ideas. carefully final courts use carefully|
+122|Customer#000000122|yp5slqoNd26lAENZW3a67wSfXA6hTF|3|13-702-694-4520|7865.46|HOUSEHOLD| the special packages hinder blithely around the permanent requests. bold depos|
+123|Customer#000000123|YsOnaaER8MkvK5cpf4VSlq|5|15-817-151-1168|5897.83|BUILDING|ependencies. regular, ironic requests are fluffily regu|
+124|Customer#000000124|aTbyVAW5tCd,v09O|18|28-183-750-7809|1842.49|AUTOMOBILE|le fluffily even dependencies. quietly s|
+125|Customer#000000125|,wSZXdVR xxIIfm9s8ITyLl3kgjT6UC07GY0Y|19|29-261-996-3120|-234.12|FURNITURE|x-ray finally after the packages? regular requests c|
+126|Customer#000000126|ha4EHmbx3kg DYCsP6DFeUOmavtQlHhcfaqr|22|32-755-914-7592|1001.39|HOUSEHOLD|s about the even instructions boost carefully furiously ironic pearls. ruthless, |
+127|Customer#000000127|Xyge4DX2rXKxXyye1Z47LeLVEYMLf4Bfcj|21|31-101-672-2951|9280.71|MACHINERY|ic, unusual theodolites nod silently after the final, ironic instructions: pending r|
+128|Customer#000000128|AmKUMlJf2NRHcKGmKjLS|4|14-280-874-8044|-986.96|HOUSEHOLD|ing packages integrate across the slyly unusual dugouts. blithely silent ideas sublate carefully. blithely expr|
+129|Customer#000000129|q7m7rbMM0BpaCdmxloCgBDRCleXsXkdD8kf|7|17-415-148-7416|9127.27|HOUSEHOLD| unusual deposits boost carefully furiously silent ideas. pending accounts cajole slyly across|
+12|Customer#000000012|9PWKuhzT4Zr1Q|13|23-791-276-1263|3396.49|HOUSEHOLD| to the carefully final braids. blithely regular requests nag. ironic theodolites boost quickly along|
+130|Customer#000000130|RKPx2OfZy0Vn 8wGWZ7F2EAvmMORl1k8iH|9|19-190-993-9281|5073.58|HOUSEHOLD|ix slowly. express packages along the furiously ironic requests integrate daringly deposits. fur|
+131|Customer#000000131|jyN6lAjb1FtH10rMC,XzlWyCBrg75|11|21-840-210-3572|8595.53|HOUSEHOLD|jole special packages. furiously final dependencies about the furiously speci|
+132|Customer#000000132|QM5YabAsTLp9|4|14-692-150-9717|162.57|HOUSEHOLD|uickly carefully special theodolites. carefully regular requests against the blithely unusual instructions |
+133|Customer#000000133|IMCuXdpIvdkYO92kgDGuyHgojcUs88p|17|27-408-997-8430|2314.67|AUTOMOBILE|t packages. express pinto beans are blithely along the unusual, even theodolites. silent packages use fu|
+134|Customer#000000134|sUiZ78QCkTQPICKpA9OBzkUp2FM|11|21-200-159-5932|4608.90|BUILDING|yly fluffy foxes boost final ideas. b|
+135|Customer#000000135|oZK,oC0 fdEpqUML|19|29-399-293-6241|8732.91|FURNITURE| the slyly final accounts. deposits cajole carefully. carefully sly packag|
+136|Customer#000000136|QoLsJ0v5C1IQbh,DS1|7|17-501-210-4726|-842.39|FURNITURE|ackages sleep ironic, final courts. even requests above the blithely bold requests g|
+137|Customer#000000137|cdW91p92rlAEHgJafqYyxf1Q|16|26-777-409-5654|7838.30|HOUSEHOLD|carefully regular theodolites use. silent dolphins cajo|
+138|Customer#000000138|5uyLAeY7HIGZqtu66Yn08f|5|15-394-860-4589|430.59|MACHINERY|ts doze on the busy ideas. regular|
+139|Customer#000000139|3ElvBwudHKL02732YexGVFVt |9|19-140-352-1403|7897.78|MACHINERY|nstructions. quickly ironic ideas are carefully. bold, |
+13|Customer#000000013|nsXQu0oVjD7PM659uC3SRSp|3|13-761-547-5974|3857.34|BUILDING|ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely|
+140|Customer#000000140|XRqEPiKgcETII,iOLDZp5jA|4|14-273-885-6505|9963.15|MACHINERY|ies detect slyly ironic accounts. slyly ironic theodolites hag|
+141|Customer#000000141|5IW,WROVnikc3l7DwiUDGQNGsLBGOL6Dc0|1|11-936-295-6204|6706.14|FURNITURE|packages nag furiously. carefully unusual accounts snooze according to the fluffily regular pinto beans. slyly spec|
+142|Customer#000000142|AnJ5lxtLjioClr2khl9pb8NLxG2,|9|19-407-425-2584|2209.81|AUTOMOBILE|. even, express theodolites upo|
+143|Customer#000000143|681r22uL452zqk 8By7I9o9enQfx0|16|26-314-406-7725|2186.50|MACHINERY|across the blithely unusual requests haggle theodo|
+144|Customer#000000144|VxYZ3ebhgbltnetaGjNC8qCccjYU05 fePLOno8y|1|11-717-379-4478|6417.31|MACHINERY|ges. slyly regular accounts are slyly. bold, idle reque|
+145|Customer#000000145|kQjHmt2kcec cy3hfMh969u|13|23-562-444-8454|9748.93|HOUSEHOLD|ests? express, express instructions use. blithely fina|
+146|Customer#000000146|GdxkdXG9u7iyI1,,y5tq4ZyrcEy|3|13-835-723-3223|3328.68|FURNITURE|ffily regular dinos are slyly unusual requests. slyly specia|
+147|Customer#000000147|6VvIwbVdmcsMzuu,C84GtBWPaipGfi7DV|18|28-803-187-4335|8071.40|AUTOMOBILE|ress packages above the blithely regular packages sleep fluffily blithely ironic accounts. |
+148|Customer#000000148|BhSPlEWGvIJyT9swk vCWE|11|21-562-498-6636|2135.60|HOUSEHOLD|ing to the carefully ironic requests. carefully regular dependencies about the theodolites wake furious|
+149|Customer#000000149|3byTHCp2mNLPigUrrq|19|29-797-439-6760|8959.65|AUTOMOBILE|al instructions haggle against the slyly bold w|
+14|Customer#000000014|KXkletMlL2JQEA |1|11-845-129-3851|5266.30|FURNITURE|, ironic packages across the unus|
+150|Customer#000000150|zeoGShTjCwGPplOWFkLURrh41O0AZ8dwNEEN4 |18|28-328-564-7630|3849.48|MACHINERY|ole blithely among the furiously pending packages. furiously bold ideas wake fluffily ironic idea|
+15|Customer#000000015|YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn|23|33-687-542-7601|2788.52|HOUSEHOLD| platelets. regular deposits detect asymptotes. blithely unusual packages nag slyly at the fluf|
+16|Customer#000000016|cYiaeMLZSMAOQ2 d0W,|10|20-781-609-3107|4681.03|FURNITURE|kly silent courts. thinly regular theodolites sleep fluffily after |
+17|Customer#000000017|izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7|2|12-970-682-3487|6.34|AUTOMOBILE|packages wake! blithely even pint|
+18|Customer#000000018|3txGO AiuFux3zT0Z9NYaFRnZt|6|16-155-215-1315|5494.43|BUILDING|s sleep. carefully even instructions nag furiously alongside of t|
+19|Customer#000000019|uc,3bHIx84H,wdrmLOjVsiqXCq2tr|18|28-396-526-5053|8914.71|HOUSEHOLD| nag. furiously careful packages are slyly at the accounts. furiously regular in|
+1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e|
+20|Customer#000000020|JrPk8Pqplj4Ne|22|32-957-234-8742|7603.40|FURNITURE|g alongside of the special excuses-- fluffily enticing packages wake |
+21|Customer#000000021|XYmVpr9yAHDEn|8|18-902-614-8344|1428.25|MACHINERY| quickly final accounts integrate blithely furiously u|
+22|Customer#000000022|QI6p41,FNs5k7RZoCCVPUTkUdYpB|3|13-806-545-9701|591.98|MACHINERY|s nod furiously above the furiously ironic ideas. |
+23|Customer#000000023|OdY W13N7Be3OC5MpgfmcYss0Wn6TKT|3|13-312-472-8245|3332.02|HOUSEHOLD|deposits. special deposits cajole slyly. fluffily special deposits about the furiously |
+24|Customer#000000024|HXAFgIAyjxtdqwimt13Y3OZO 4xeLe7U8PqG|13|23-127-851-8031|9255.67|MACHINERY|into beans. fluffily final ideas haggle fluffily|
+25|Customer#000000025|Hp8GyFQgGHFYSilH5tBfe|12|22-603-468-3533|7133.70|FURNITURE|y. accounts sleep ruthlessly according to the regular theodolites. unusual instructions sleep. ironic, final|
+26|Customer#000000026|8ljrc5ZeMl7UciP|22|32-363-455-4837|5182.05|AUTOMOBILE|c requests use furiously ironic requests. slyly ironic dependencies us|
+27|Customer#000000027|IS8GIyxpBrLpMT0u7|3|13-137-193-2709|5679.84|BUILDING| about the carefully ironic pinto beans. accoun|
+28|Customer#000000028|iVyg0daQ,Tha8x2WPWA9m2529m|8|18-774-241-1462|1007.18|FURNITURE| along the regular deposits. furiously final pac|
+29|Customer#000000029|sJ5adtfyAkCK63df2,vF25zyQMVYE34uh|0|10-773-203-7342|7618.27|FURNITURE|its after the carefully final platelets x-ray against |
+2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref|
+30|Customer#000000030|nJDsELGAavU63Jl0c5NKsKfL8rIJQQkQnYL2QJY|1|11-764-165-5076|9321.01|BUILDING|lithely final requests. furiously unusual account|
+31|Customer#000000031|LUACbO0viaAv6eXOAebryDB xjVst|23|33-197-837-7094|5236.89|HOUSEHOLD|s use among the blithely pending depo|
+32|Customer#000000032|jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J|15|25-430-914-2194|3471.53|BUILDING|cial ideas. final, furious requests across the e|
+33|Customer#000000033|qFSlMuLucBmx9xnn5ib2csWUweg D|17|27-375-391-1280|-78.56|AUTOMOBILE|s. slyly regular accounts are furiously. carefully pending requests|
+34|Customer#000000034|Q6G9wZ6dnczmtOx509xgE,M2KV|15|25-344-968-5422|8589.70|HOUSEHOLD|nder against the even, pending accounts. even|
+35|Customer#000000035|TEjWGE4nBzJL2|17|27-566-888-7431|1228.24|HOUSEHOLD|requests. special, express requests nag slyly furiousl|
+36|Customer#000000036|3TvCzjuPzpJ0,DdJ8kW5U|21|31-704-669-5769|4987.27|BUILDING|haggle. enticing, quiet platelets grow quickly bold sheaves. carefully regular acc|
+37|Customer#000000037|7EV4Pwh,3SboctTWt|8|18-385-235-7162|-917.75|FURNITURE|ilent packages are carefully among the deposits. furiousl|
+38|Customer#000000038|a5Ee5e9568R8RLP 2ap7|12|22-306-880-7212|6345.11|HOUSEHOLD|lar excuses. closely even asymptotes cajole blithely excuses. carefully silent pinto beans sleep carefully fin|
+39|Customer#000000039|nnbRg,Pvy33dfkorYE FdeZ60|2|12-387-467-6509|6264.31|AUTOMOBILE|tions. slyly silent excuses slee|
+3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov|
+40|Customer#000000040|gOnGWAyhSV1ofv|3|13-652-915-8939|1335.30|BUILDING|rges impress after the slyly ironic courts. foxes are. blithely |
+41|Customer#000000041|IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF|10|20-917-711-4011|270.95|HOUSEHOLD|ly regular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo|
+42|Customer#000000042|ziSrvyyBke|5|15-416-330-4175|8727.01|BUILDING|ssly according to the pinto beans: carefully special requests across the even, pending accounts wake special|
+43|Customer#000000043|ouSbjHk8lh5fKX3zGso3ZSIj9Aa3PoaFd|19|29-316-665-2897|9904.28|MACHINERY|ial requests: carefully pending foxes detect quickly. carefully final courts cajole quickly. carefully|
+44|Customer#000000044|Oi,dOSPwDu4jo4x,,P85E0dmhZGvNtBwi|16|26-190-260-5375|7315.94|AUTOMOBILE|r requests around the unusual, bold a|
+45|Customer#000000045|4v3OcpFgoOmMG,CbnF,4mdC|9|19-715-298-9917|9983.38|AUTOMOBILE|nto beans haggle slyly alongside of t|
+46|Customer#000000046|eaTXWWm10L9|6|16-357-681-2007|5744.59|AUTOMOBILE|ctions. accounts sleep furiously even requests. regular, regular accounts cajole blithely around the final pa|
+47|Customer#000000047|b0UgocSqEW5 gdVbhNT|2|12-427-271-9466|274.58|BUILDING|ions. express, ironic instructions sleep furiously ironic ideas. furi|
+48|Customer#000000048|0UU iPhBupFvemNB|0|10-508-348-5882|3792.50|BUILDING|re fluffily pending foxes. pending, bold platelets sleep slyly. even platelets cajo|
+49|Customer#000000049|cNgAeX7Fqrdf7HQN9EwjUa4nxT,68L FKAxzl|10|20-908-631-4424|4573.94|FURNITURE|nusual foxes! fluffily pending packages maintain to the regular |
+4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou|
+50|Customer#000000050|9SzDYlkzxByyJ1QeTI o|6|16-658-112-3221|4266.13|MACHINERY|ts. furiously ironic accounts cajole furiously slyly ironic dinos.|
+51|Customer#000000051|uR,wEaiTvo4|12|22-344-885-4251|855.87|FURNITURE|eposits. furiously regular requests integrate carefully packages. furious|
+52|Customer#000000052|7 QOqGqqSy9jfV51BC71jcHJSD0|11|21-186-284-5998|5630.28|HOUSEHOLD|ic platelets use evenly even accounts. stealthy theodolites cajole furiou|
+53|Customer#000000053|HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib|15|25-168-852-5363|4113.64|HOUSEHOLD|ar accounts are. even foxes are blithely. fluffily pending deposits boost|
+54|Customer#000000054|,k4vf 5vECGWFy,hosTE,|4|14-776-370-4745|868.90|AUTOMOBILE|sual, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi|
+55|Customer#000000055|zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom|10|20-180-440-8525|4572.11|MACHINERY|ully unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab|
+56|Customer#000000056|BJYZYJQk4yD5B|10|20-895-685-6920|6530.86|FURNITURE|. notornis wake carefully. carefully fluffy requests are furiously even accounts. slyly expre|
+57|Customer#000000057|97XYbsuOPRXPWU|21|31-835-306-1650|4151.93|AUTOMOBILE|ove the carefully special packages. even, unusual deposits sleep slyly pend|
+58|Customer#000000058|g9ap7Dk1Sv9fcXEWjpMYpBZIRUohi T|13|23-244-493-2508|6478.46|HOUSEHOLD|ideas. ironic ideas affix furiously express, final instructions. regular excuses use quickly e|
+59|Customer#000000059|zLOCP0wh92OtBihgspOGl4|1|11-355-584-3112|3458.60|MACHINERY|ously final packages haggle blithely after the express deposits. furiou|
+5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor|
+60|Customer#000000060|FyodhjwMChsZmUz7Jz0H|12|22-480-575-5866|2741.87|MACHINERY|latelets. blithely unusual courts boost furiously about the packages. blithely final instruct|
+61|Customer#000000061|9kndve4EAJxhg3veF BfXr7AqOsT39o gtqjaYE|17|27-626-559-8599|1536.24|FURNITURE|egular packages shall have to impress along the |
+62|Customer#000000062|upJK2Dnw13,|7|17-361-978-7059|595.61|MACHINERY|kly special dolphins. pinto beans are slyly. quickly regular accounts are furiously a|
+63|Customer#000000063|IXRSpVWWZraKII|21|31-952-552-9584|9331.13|AUTOMOBILE|ithely even accounts detect slyly above the fluffily ir|
+64|Customer#000000064|MbCeGY20kaKK3oalJD,OT|3|13-558-731-7204|-646.64|BUILDING|structions after the quietly ironic theodolites cajole be|
+65|Customer#000000065|RGT yzQ0y4l0H90P783LG4U95bXQFDRXbWa1sl,X|23|33-733-623-5267|8795.16|AUTOMOBILE|y final foxes serve carefully. theodolites are carefully. pending i|
+66|Customer#000000066|XbsEqXH1ETbJYYtA1A|22|32-213-373-5094|242.77|HOUSEHOLD|le slyly accounts. carefully silent packages benea|
+67|Customer#000000067|rfG0cOgtr5W8 xILkwp9fpCS8|9|19-403-114-4356|8166.59|MACHINERY|indle furiously final, even theodo|
+68|Customer#000000068|o8AibcCRkXvQFh8hF,7o|12|22-918-832-2411|6853.37|HOUSEHOLD| pending pinto beans impress realms. final dependencies |
+69|Customer#000000069|Ltx17nO9Wwhtdbe9QZVxNgP98V7xW97uvSH1prEw|9|19-225-978-5670|1709.28|HOUSEHOLD|thely final ideas around the quickly final dependencies affix carefully quickly final theodolites. final accounts c|
+6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious|
+70|Customer#000000070|mFowIuhnHjp2GjCiYYavkW kUwOjIaTCQ|22|32-828-107-2832|4867.52|FURNITURE|fter the special asymptotes. ideas after the unusual frets cajole quickly regular pinto be|
+71|Customer#000000071|TlGalgdXWBmMV,6agLyWYDyIz9MKzcY8gl,w6t1B|7|17-710-812-5403|-611.19|HOUSEHOLD|g courts across the regular, final pinto beans are blithely pending ac|
+72|Customer#000000072|putjlmskxE,zs,HqeIA9Wqu7dhgH5BVCwDwHHcf|2|12-759-144-9689|-362.86|FURNITURE|ithely final foxes sleep always quickly bold accounts. final wat|
+73|Customer#000000073|8IhIxreu4Ug6tt5mog4|0|10-473-439-3214|4288.50|BUILDING|usual, unusual packages sleep busily along the furiou|
+74|Customer#000000074|IkJHCA3ZThF7qL7VKcrU nRLl,kylf |4|14-199-862-7209|2764.43|MACHINERY|onic accounts. blithely slow packages would haggle carefully. qui|
+75|Customer#000000075|Dh 6jZ,cwxWLKQfRKkiGrzv6pm|18|28-247-803-9025|6684.10|AUTOMOBILE| instructions cajole even, even deposits. finally bold deposits use above the even pains. slyl|
+76|Customer#000000076|m3sbCvjMOHyaOofH,e UkGPtqc4|0|10-349-718-3044|5745.33|FURNITURE|pecial deposits. ironic ideas boost blithely according to the closely ironic theodolites! furiously final deposits n|
+77|Customer#000000077|4tAE5KdMFGD4byHtXF92vx|17|27-269-357-4674|1738.87|BUILDING|uffily silent requests. carefully ironic asymptotes among the ironic hockey players are carefully bli|
+78|Customer#000000078|HBOta,ZNqpg3U2cSL0kbrftkPwzX|9|19-960-700-9191|7136.97|FURNITURE|ests. blithely bold pinto beans h|
+79|Customer#000000079|n5hH2ftkVRwW8idtD,BmM2|15|25-147-850-4166|5121.28|MACHINERY|es. packages haggle furiously. regular, special requests poach after the quickly express ideas. blithely pending re|
+7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|18|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp|
+80|Customer#000000080|K,vtXp8qYB |0|10-267-172-7101|7383.53|FURNITURE|tect among the dependencies. bold accounts engage closely even pinto beans. ca|
+81|Customer#000000081|SH6lPA7JiiNC6dNTrR|20|30-165-277-3269|2023.71|BUILDING|r packages. fluffily ironic requests cajole fluffily. ironically regular theodolit|
+82|Customer#000000082|zhG3EZbap4c992Gj3bK,3Ne,Xn|18|28-159-442-5305|9468.34|AUTOMOBILE|s wake. bravely regular accounts are furiously. regula|
+83|Customer#000000083|HnhTNB5xpnSF20JBH4Ycs6psVnkC3RDf|22|32-817-154-4122|6463.51|BUILDING|ccording to the quickly bold warhorses. final, regular foxes integrate carefully. bold packages nag blithely ev|
+84|Customer#000000084|lpXz6Fwr9945rnbtMc8PlueilS1WmASr CB|11|21-546-818-3802|5174.71|FURNITURE|ly blithe foxes. special asymptotes haggle blithely against the furiously regular depo|
+85|Customer#000000085|siRerlDwiolhYR 8FgksoezycLj|5|15-745-585-8219|3386.64|FURNITURE|ronic ideas use above the slowly pendin|
+86|Customer#000000086|US6EGGHXbTTXPL9SBsxQJsuvy|0|10-677-951-2353|3306.32|HOUSEHOLD|quests. pending dugouts are carefully aroun|
+87|Customer#000000087|hgGhHVSWQl 6jZ6Ev|23|33-869-884-7053|6327.54|FURNITURE|hely ironic requests integrate according to the ironic accounts. slyly regular pla|
+88|Customer#000000088|wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg|16|26-516-273-2566|8031.44|AUTOMOBILE|s are quickly above the quickly ironic instructions; even requests about the carefully final deposi|
+89|Customer#000000089|dtR, y9JQWUO6FoJExyp8whOU|14|24-394-451-5404|1530.76|FURNITURE|counts are slyly beyond the slyly final accounts. quickly final ideas wake. r|
+8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|17|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide|
+90|Customer#000000090|QxCzH7VxxYUWwfL7|16|26-603-491-1238|7354.23|BUILDING|sly across the furiously even |
+91|Customer#000000091|S8OMYFrpHwoNHaGBeuS6E 6zhHGZiprw1b7 q|8|18-239-400-3677|4643.14|AUTOMOBILE|onic accounts. fluffily silent pinto beans boost blithely according to the fluffily exp|
+92|Customer#000000092|obP PULk2LH LqNF,K9hcbNqnLAkJVsl5xqSrY,|2|12-446-416-8471|1182.91|MACHINERY|. pinto beans hang slyly final deposits. ac|
+93|Customer#000000093|EHXBr2QGdh|7|17-359-388-5266|2182.52|MACHINERY|press deposits. carefully regular platelets r|
+94|Customer#000000094|IfVNIN9KtkScJ9dUjK3Pg5gY1aFeaXewwf|9|19-953-499-8833|5500.11|HOUSEHOLD|latelets across the bold, final requests sleep according to the fluffily bold accounts. unusual deposits amon|
+95|Customer#000000095|EU0xvmWvOmUUn5J,2z85DQyG7QCJ9Xq7|15|25-923-255-2929|5327.38|MACHINERY|ithely. ruthlessly final requests wake slyly alongside of the furiously silent pinto beans. even the|
+96|Customer#000000096|vWLOrmXhRR|8|18-422-845-1202|6323.92|AUTOMOBILE|press requests believe furiously. carefully final instructions snooze carefully. |
+97|Customer#000000097|OApyejbhJG,0Iw3j rd1M|17|27-588-919-5638|2164.48|AUTOMOBILE|haggle slyly. bold, special ideas are blithely above the thinly bold theo|
+98|Customer#000000098|7yiheXNSpuEAwbswDW|12|22-885-845-6889|-551.37|BUILDING|ages. furiously pending accounts are quickly carefully final foxes: busily pe|
+99|Customer#000000099|szsrOiPtCHVS97Lt|15|25-515-237-9232|4088.65|HOUSEHOLD|cajole slyly about the regular theodolites! furiously bold requests nag along the pending, regular packages. somas|
+9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|8|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl|
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+    <name>fs.default.name</name>
+    <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+    <name>hadoop.tmp.dir</name>
+    <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+   <name>dfs.replication</name>
+   <value>1</value>
+</property>
+
+<property>
+	<name>dfs.block.size</name>
+	<value>65536</value>
+</property>
+
+</configuration>
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..39b6505
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>localhost:29007</value>
+  </property>
+  <property>
+     <name>mapred.tasktracker.map.tasks.maximum</name>
+     <value>20</value>
+  </property>
+   <property>
+      <name>mapred.tasktracker.reduce.tasks.maximum</name>
+      <value>20</value>
+   </property>
+   <property>
+      <name>mapred.max.split.size</name>
+      <value>2048</value>
+   </property>
+
+</configuration>