used hdfsdescriptor from hyracks-hdfs
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2872 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index bb868ce..a0a6b9a 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -4,44 +4,46 @@
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 8735044913496854551L;
+public class Integer64NormalizedKeyComputerFactory implements
+ INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 8735044913496854551L;
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- private static final int POSTIVE_LONG_MASK = (3 << 30);
- private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
- private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- long value = Integer64SerializerDeserializer.getLong(bytes, start);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /** * larger than Integer.MAX */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /** * smaller than Integer.MAX but >=0 */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /** * less than 0: have not optimized for that */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
- }
- }
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ long value = Integer64SerializerDeserializer.getLong(bytes,
+ start);
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /** * larger than Integer.MAX */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /** * smaller than Integer.MAX but >=0 */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /** * less than 0: have not optimized for that */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
- private int getKey(int value) {
- return value ^ Integer.MIN_VALUE;
- }
- };
- }
+ private int getKey(int value) {
+ return value ^ Integer.MIN_VALUE;
+ }
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index f3a3e72..ac8a6bc 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -3,60 +3,59 @@
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-public class VLongNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 8735044913496854551L;
+public class VLongNormalizedKeyComputerFactory implements
+ INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 8735044913496854551L;
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- private static final int POSTIVE_LONG_MASK = (3 << 30);
- private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
- private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ private static final int POSTIVE_LONG_MASK = (3 << 30);
+ private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+ private static final int NEGATIVE_LONG_MASK = (0 << 30);
-
- private long getLong(byte[] bytes, int offset) {
- int l = (int)Math.ceil((double)bytes[offset]/4.0);
- int n = (l<8)?l:8;
-
- long r = 0;
- for(int i = 0 ; i < n ; i++){
- r <<= 8;
- r += (long) (bytes[offset + 1] & 0xff);
- }
-
- return r;
- }
-
-
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- long value = getLong(bytes, start);
-
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /** * larger than Integer.MAX */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /** * smaller than Integer.MAX but >=0 */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /** * less than 0: have not optimized for that */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
- }
- }
+ private long getLong(byte[] bytes, int offset) {
+ int l = (int) Math.ceil((double) bytes[offset] / 4.0);
+ int n = (l < 8) ? l : 8;
- private int getKey(int value) {
- return value ^ Integer.MIN_VALUE;
- }
- };
- }
+ long r = 0;
+ for (int i = 0; i < n; i++) {
+ r <<= 8;
+ r += (long) (bytes[offset + 1] & 0xff);
+ }
+
+ return r;
+ }
+
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ long value = getLong(bytes, start);
+
+ int highValue = (int) (value >> 32);
+ if (highValue > 0) {
+ /** * larger than Integer.MAX */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= POSTIVE_LONG_MASK;
+ return highNmk;
+ } else if (highValue == 0) {
+ /** * smaller than Integer.MAX but >=0 */
+ int lowNmk = (int) value;
+ lowNmk >>= 2;
+ lowNmk |= NON_NEGATIVE_INT_MASK;
+ return lowNmk;
+ } else {
+ /** * less than 0: have not optimized for that */
+ int highNmk = getKey(highValue);
+ highNmk >>= 2;
+ highNmk |= NEGATIVE_LONG_MASK;
+ return highNmk;
+ }
+ }
+
+ private int getKey(int value) {
+ return value ^ Integer.MIN_VALUE;
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index 90bd12f..4078718 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -6,30 +6,32 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {
+public class KmerHashPartitioncomputerFactory implements
+ ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public KmerHashPartitioncomputerFactory() {
- }
+ public KmerHashPartitioncomputerFactory() {
+ }
- @Override
- public ITuplePartitionComputer createPartitioner() {
- return new ITuplePartitionComputer() {
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
- if (nParts == 1) {
- return 0;
- }
- int startOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
- int slotLength = accessor.getFieldSlotsLength();
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex,
+ int nParts) {
+ if (nParts == 1) {
+ return 0;
+ }
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+ int slotLength = accessor.getFieldSlotsLength();
- ByteBuffer buf = accessor.getBuffer();
- buf.position(startOffset + fieldOffset + slotLength);
- long l = accessor.getBuffer().getLong();
- return (int) (l % nParts);
- }
- };
- }
+ ByteBuffer buf = accessor.getBuffer();
+ buf.position(startOffset + fieldOffset + slotLength);
+ long l = accessor.getBuffer().getLong();
+ return (int) (l % nParts);
+ }
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
index 41dcb07..d88c0a0 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/serde/ByteSerializerDeserializer.java
@@ -7,39 +7,41 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+public class ByteSerializerDeserializer implements
+ ISerializerDeserializer<Byte> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+ public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
- private ByteSerializerDeserializer() {
- }
+ private ByteSerializerDeserializer() {
+ }
- @Override
- public Byte deserialize(DataInput in) throws HyracksDataException {
- try {
- return in.readByte();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public Byte deserialize(DataInput in) throws HyracksDataException {
+ try {
+ return in.readByte();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
- try {
- out.writeByte(instance.intValue());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void serialize(Byte instance, DataOutput out)
+ throws HyracksDataException {
+ try {
+ out.writeByte(instance.intValue());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- public static byte getByte(byte[] bytes, int offset) {
- return bytes[offset];
- }
+ public static byte getByte(byte[] bytes, int offset) {
+ return bytes[offset];
+ }
- public static void putByte(byte val, byte[] bytes, int offset) {
- bytes[offset] = val;
- }
+ public static void putByte(byte val, byte[] bytes, int offset) {
+ bytes[offset] = val;
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
index 9685dd1..6477e14 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
@@ -5,8 +5,7 @@
import edu.uci.ics.hyracks.data.std.api.IHashable;
import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-public class LongBinaryHashFunctionFamily implements
- IBinaryHashFunctionFamily {
+public class LongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
private static final long serialVersionUID = 1L;
@Override
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
index 9092517..661559f 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -1,4 +1,3 @@
-
package edu.uci.ics.genomix.data.std.accessors;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
@@ -6,37 +5,36 @@
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {
- public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();
+ public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877, 3, 29 };
+ static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877,
+ 3, 29 };
- private LongHashFunctionFamily() {
- }
+ private LongHashFunctionFamily() {
+ }
- @Override
- public IBinaryHashFunction createBinaryHashFunction(int seed) {
- final int coefficient = primeCoefficents[seed % primeCoefficents.length];
- final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(int seed) {
+ final int coefficient = primeCoefficents[seed % primeCoefficents.length];
+ final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];
- return new IBinaryHashFunction() {
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- int h = 0;
- int utflen = UTF8StringPointable.getUTFLength(bytes, offset);
- int sStart = offset + 2;
- int c = 0;
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = 0;
+ int utflen = UTF8StringPointable.getUTFLength(bytes, offset);
+ int sStart = offset + 2;
+ int c = 0;
- while (c < utflen) {
- char ch = UTF8StringPointable.charAt(bytes, sStart + c);
- h = (coefficient * h + ch) % r;
- c += UTF8StringPointable.charSize(bytes, sStart + c);
- }
- return h;
- }
- };
- }
+ while (c < utflen) {
+ char ch = UTF8StringPointable.charAt(bytes, sStart + c);
+ h = (coefficient * h + ch) % r;
+ c += UTF8StringPointable.charSize(bytes, sStart + c);
+ }
+ return h;
+ }
+ };
+ }
}
-
-
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
index 361e036..b1db6f2 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -3,72 +3,75 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
-
- public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
-
- private static final long serialVersionUID = 1L;
-
- private MurmurHash3BinaryHashFunctionFamily(){}
+public class MurmurHash3BinaryHashFunctionFamily implements
+ IBinaryHashFunctionFamily {
- private static final int C1 = 0xcc9e2d51;
- private static final int C2 = 0x1b873593;
- private static final int C3 = 5;
- private static final int C4 = 0xe6546b64;
- private static final int C5 = 0x85ebca6b;
- private static final int C6 = 0xc2b2ae35;
+ public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
- @Override
- public IBinaryHashFunction createBinaryHashFunction(final int seed) {
- return new IBinaryHashFunction() {
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- int h = seed;
- int p = offset;
- int remain = length;
- while (remain > 4) {
- int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)
- | (((int) bytes[p + 3]) << 24);
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- h = Integer.rotateLeft(h, 13);
- h = h * C3 + C4;
- p += 4;
- remain -= 4;
- }
- int k = 0;
- for(int i = 0; remain > 0; i += 8){
- k ^= bytes[p++] << i;
- remain--;
- }
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- // switch (remain) {
- // case 3:
- // k = bytes[p++];
- // case 2:
- // k = (k << 8) | bytes[p++];
- // case 1:
- // k = (k << 8) | bytes[p++];
- // k *= C1;
- // k = Integer.rotateLeft(k, 15);
- // k *= C2;
- // h ^= k;
- // h = Integer.rotateLeft(h, 13);
- // h = h * C3 + C4;
- // }
- h ^= length;
- h ^= (h >>> 16);
- h *= C5;
- h ^= (h >>> 13);
- h *= C6;
- h ^= (h >>> 16);
- return h;
- }
- };
- }
+ private static final long serialVersionUID = 1L;
+
+ private MurmurHash3BinaryHashFunctionFamily() {
+ }
+
+ private static final int C1 = 0xcc9e2d51;
+ private static final int C2 = 0x1b873593;
+ private static final int C3 = 5;
+ private static final int C4 = 0xe6546b64;
+ private static final int C5 = 0x85ebca6b;
+ private static final int C6 = 0xc2b2ae35;
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int h = seed;
+ int p = offset;
+ int remain = length;
+ while (remain > 4) {
+ int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8)
+ | (((int) bytes[p + 2]) << 16)
+ | (((int) bytes[p + 3]) << 24);
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ h = Integer.rotateLeft(h, 13);
+ h = h * C3 + C4;
+ p += 4;
+ remain -= 4;
+ }
+ int k = 0;
+ for (int i = 0; remain > 0; i += 8) {
+ k ^= bytes[p++] << i;
+ remain--;
+ }
+ k *= C1;
+ k = Integer.rotateLeft(k, 15);
+ k *= C2;
+ h ^= k;
+ // switch (remain) {
+ // case 3:
+ // k = bytes[p++];
+ // case 2:
+ // k = (k << 8) | bytes[p++];
+ // case 1:
+ // k = (k << 8) | bytes[p++];
+ // k *= C1;
+ // k = Integer.rotateLeft(k, 15);
+ // k *= C2;
+ // h ^= k;
+ // h = Integer.rotateLeft(h, 13);
+ // h = h * C3 + C4;
+ // }
+ h ^= length;
+ h ^= (h >>> 16);
+ h *= C5;
+ h ^= (h >>> 13);
+ h *= C6;
+ h ^= (h >>> 16);
+ return h;
+ }
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
index 2cd66e9..b9a0443 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
@@ -5,8 +5,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.data.std.api.IHashable;
-public class VLongBinaryHashFunctionFamily implements
- IBinaryHashFunctionFamily {
+public class VLongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
private static final long serialVersionUID = 1L;
@Override
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index 65978d7..77cfd6e 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -8,134 +8,130 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-public final class VLongPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
+public final class VLongPointable extends AbstractPointable implements
+ IHashable, IComparable, INumeric {
static private int max = 65535;
- public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
- private static final long serialVersionUID = 1L;
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
- @Override
- public boolean isFixedLength() {
- return false;
- }
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
- @Override
- public int getFixedLength() {
- return -1;
- }
- };
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
- public static final IPointableFactory FACTORY = new IPointableFactory() {
- private static final long serialVersionUID = 1L;
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
- @Override
- public IPointable createPointable() {
- return new VLongPointable();
- }
+ @Override
+ public IPointable createPointable() {
+ return new VLongPointable();
+ }
- @Override
- public ITypeTraits getTypeTraits() {
- return TYPE_TRAITS;
- }
- };
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+ public static long getLong(byte[] bytes, int start) {
+ int l = (int) Math.ceil((double) bytes[start] / 4.0);
+ int n = (l < 8) ? l : 8;
+ long r = 0;
+ for (int i = 0; i < n; i++) {
+ r <<= 8;
+ r += (long) (bytes[start + 1] & 0xff);
+ }
- public static long getLong(byte[] bytes, int start) {
- int l = (int)Math.ceil((double)bytes[start]/4.0);
- int n = (l<8)?l:8;
-
- long r = 0;
- for(int i = 0 ; i < n ; i++){
- r <<= 8;
- r += (long) (bytes[start + 1] & 0xff);
- }
-
- return r;
- }
-
- public long getLong() {
- return getLong(bytes, start);
- }
-
- public byte[] postIncrement() {
- int i = start + 1;
- int l = (int)Math.ceil(bytes[start]/4);
- while(i <= start+l){
- bytes[i] += 1;
- if(bytes[i] != 0){
- break;
- }
- }
- return bytes;
- }
+ return r;
+ }
- @Override
- public int compareTo(IPointable pointer) {
- return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
- }
+ public long getLong() {
+ return getLong(bytes, start);
+ }
- @Override
- public int compareTo(byte[] bytes, int start, int length) {
-
- int be = this.start;
- int n = (this.bytes[be] < bytes[start])? this.bytes[be] : bytes[start];
- int l = (int)Math.ceil(n/4);
- for(int i = 0 ; i <= l ; i++){
- if(this.bytes[i]<bytes[i]){
- return -1;
- }
- else if(this.bytes[i]>bytes[i]){
- return 1;
- }
-
- }
-
- return (this.bytes[be] < bytes[start])? -1 : ((this.bytes[be] > bytes[start])?1:0);
- }
+ public byte[] postIncrement() {
+ int i = start + 1;
+ int l = (int) Math.ceil(bytes[start] / 4);
+ while (i <= start + l) {
+ bytes[i] += 1;
+ if (bytes[i] != 0) {
+ break;
+ }
+ }
+ return bytes;
+ }
- @Override
- public int hash() {//BKDRHash
- int seed = 131; // 31 131 1313 13131 131313 etc..
- int hash = 0;
- int l = (int)Math.ceil((double)bytes[start]/4.0);
- for(int i = start + 1 ; i <= start + l ; i++)
- {
- hash = hash * seed + bytes[i];
- }
- return (hash & 0x7FFFFFFF);
- }
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(),
+ pointer.getLength());
+ }
- @Override
- public byte byteValue() {
- return (byte) bytes[start+1];
- }
+ @Override
+ public int compareTo(byte[] bytes, int start, int length) {
- @Override
- public short shortValue() {
-
- return (short) ((bytes[start+2] & 0xff) << 8 + bytes[start+1] & 0xff);
- }
+ int be = this.start;
+ int n = (this.bytes[be] < bytes[start]) ? this.bytes[be] : bytes[start];
+ int l = (int) Math.ceil(n / 4);
+ for (int i = 0; i <= l; i++) {
+ if (this.bytes[i] < bytes[i]) {
+ return -1;
+ } else if (this.bytes[i] > bytes[i]) {
+ return 1;
+ }
- @Override
- public int intValue() {
- return (int) ( (bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) <<16 +
- (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
- }
+ }
-
-
- @Override
- public long longValue() {
- return getLong();
- }
+ return (this.bytes[be] < bytes[start]) ? -1
+ : ((this.bytes[be] > bytes[start]) ? 1 : 0);
+ }
- @Override
- public float floatValue() {
- return getLong();
- }
+ @Override
+ public int hash() {// BKDRHash
+ int seed = 131; // 31 131 1313 13131 131313 etc..
+ int hash = 0;
+ int l = (int) Math.ceil((double) bytes[start] / 4.0);
+ for (int i = start + 1; i <= start + l; i++) {
+ hash = hash * seed + bytes[i];
+ }
+ return (hash & 0x7FFFFFFF);
+ }
- @Override
- public double doubleValue() {
- return getLong();
- }
+ @Override
+ public byte byteValue() {
+ return (byte) bytes[start + 1];
+ }
+
+ @Override
+ public short shortValue() {
+
+ return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
+ }
+
+ @Override
+ public int intValue() {
+ return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
+ }
+
+ @Override
+ public long longValue() {
+ return getLong();
+ }
+
+ @Override
+ public float floatValue() {
+ return getLong();
+ }
+
+ @Override
+ public double doubleValue() {
+ return getLong();
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
index 78c37b7..20c6a28 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -7,18 +7,20 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
- private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
- private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+public class ConnectorPolicyAssignmentPolicy implements
+ IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
- @Override
- public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
- int[] fanouts) {
- if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
- } else {
- return pipeliningPolicy;
- }
- }
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(
+ IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index 3f0f194..ebc5346 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -22,263 +22,278 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class FileScanDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class FileScanDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- private int k;
- private Path [] filesplit = null ;
- private String pathSurfix ;
- private int byteNum;
+ private static final long serialVersionUID = 1L;
+ private int k;
+ private Path[] filesplit = null;
+ private String pathSurfix;
+ private int byteNum;
- public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k, String path) {
- super(spec, 0, 1);
- this.k = k;
- this.pathSurfix = path;
-
- byteNum = (byte)Math.ceil((double)k/4.0);
- //recordDescriptors[0] = news RecordDescriptor(
- // new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
- null, null});
- }
+ public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k,
+ String path) {
+ super(spec, 0, 1);
+ this.k = k;
+ this.pathSurfix = path;
- public FileScanDescriptor(JobSpecification jobSpec, int kmers,
- Path[] inputPaths) {
- super(jobSpec, 0, 1);
- this.k = k;
- this.filesplit = inputPaths;
- this.pathSurfix = inputPaths[0].toString();
- //recordDescriptors[0] = news RecordDescriptor(
- // new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
- recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
- null, ByteSerializerDeserializer.INSTANCE });
+ byteNum = (byte) Math.ceil((double) k / 4.0);
+ // recordDescriptors[0] = news RecordDescriptor(
+ // new ISerializerDeserializer[] {
+ // UTF8StringSerializerDeserializer.INSTANCE });
+ recordDescriptors[0] = new RecordDescriptor(
+ new ISerializerDeserializer[] { null, null });
}
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ public FileScanDescriptor(JobSpecification jobSpec, int kmers,
+ Path[] inputPaths) {
+ super(jobSpec, 0, 1);
+ this.k = k;
+ this.filesplit = inputPaths;
+ this.pathSurfix = inputPaths[0].toString();
+ // recordDescriptors[0] = news RecordDescriptor(
+ // new ISerializerDeserializer[] {
+ // UTF8StringSerializerDeserializer.INSTANCE });
+ recordDescriptors[0] = new RecordDescriptor(
+ new ISerializerDeserializer[] { null,
+ ByteSerializerDeserializer.INSTANCE });
+ }
- final int temp = partition;
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
- // TODO Auto-generated method stub
- return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {
- private ArrayTupleBuilder tupleBuilder;
- private ByteBuffer outputBuffer;
- private FrameTupleAppender outputAppender;
+ final int temp = partition;
- @SuppressWarnings("resource")
+ // TODO Auto-generated method stub
+ return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private ArrayTupleBuilder tupleBuilder;
+ private ByteBuffer outputBuffer;
+ private FrameTupleAppender outputAppender;
+
+ @SuppressWarnings("resource")
@Override
- public void initialize() {
+ public void initialize() {
- tupleBuilder = new ArrayTupleBuilder(2);
- outputBuffer = ctx.allocateFrame();
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
- try {// one try with multiple catch?
- writer.open();
- String s = pathSurfix + String.valueOf(temp);
-
- File tf = new File(s);
-
- File[] fa = tf.listFiles();
+ tupleBuilder = new ArrayTupleBuilder(2);
+ outputBuffer = ctx.allocateFrame();
+ outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+ try {// one try with multiple catch?
+ writer.open();
+ String s = pathSurfix + String.valueOf(temp);
- for(int i = 0 ; i < fa.length ; i++){
- BufferedReader readsfile = new BufferedReader(new InputStreamReader(new FileInputStream(fa[i])));
- String read = readsfile.readLine();
- while (read != null) {
- read = readsfile.readLine();
- SplitReads(read.getBytes());
- //read.getBytes();
+ File tf = new File(s);
- read = readsfile.readLine();
- read = readsfile.readLine();
+ File[] fa = tf.listFiles();
- read = readsfile.readLine();
- }
- }
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender = null;
- outputBuffer = null;
- // sort code for external sort here?
- writer.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- throw new IllegalStateException(e);
- }
- }
+ for (int i = 0; i < fa.length; i++) {
+ BufferedReader readsfile = new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(fa[i])));
+ String read = readsfile.readLine();
+ while (read != null) {
+ read = readsfile.readLine();
+ SplitReads(read.getBytes());
+ // read.getBytes();
- private byte[] CompressKmer(byte[] array, int start) {
- // a: 00; c: 01; G: 10; T: 11
-
- byte[] bytes = new byte[byteNum+1];
- bytes[0] = (byte) k;
-
- byte l = 0;
- int count = 0;
- int bcount = 0;
-
- for (int i = start; i < start + k; i++) {
- l <<= 2;
- switch (array[i]) {
- case 'A':
- case 'a':
- l |= 0;
- break;
- case 'C':
- case 'c':
- l |= 1;
- break;
- case 'G':
- case 'g':
- l |= 2;
- break;
- case 'T':
- case 't':
- l |= 3;
- break;
- }
- count += 2;
- if(count%8==0){
- bcount += 1;
- bytes[bcount] = l;
- count = 0;
- }
- }
- bytes[bcount + 1] = l;
- return bytes;
- }
+ read = readsfile.readLine();
+ read = readsfile.readLine();
- private byte GetBitmap(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1;
- break;
- case 'C':
- case 'c':
- r = 2;
- break;
- case 'G':
- case 'g':
- r = 4;
- break;
- case 'T':
- case 't':
- r = 8;
- break;
- }
- return r;
- }
+ read = readsfile.readLine();
+ }
+ }
+ if (outputAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ outputAppender = null;
+ outputBuffer = null;
+ // sort code for external sort here?
+ writer.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ throw new IllegalStateException(e);
+ }
+ }
- private byte ConvertSymbol(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 0;
- break;
- case 'C':
- case 'c':
- r = 1;
- break;
- case 'G':
- case 'g':
- r = 2;
- break;
- case 'T':
- case 't':
- r = 3;
- break;
- }
- return r;
- }
-
- void MoveKmer(byte[] bytes, byte c){
- byte filter0 = (byte) 0xC0;
- byte filter1 = (byte) 0xFC;
- byte filter2 = 0;
-
- int r = byteNum*8 - 2*k;
- r = 8 - r;
- for(int i = 0 ; i < r ; i++){
- filter2 <<= 1;
- filter2 |= 1;
- }
+ private byte[] CompressKmer(byte[] array, int start) {
+ // a: 00; c: 01; G: 10; T: 11
- int i = byteNum;
- bytes[i] <<= 2;
- bytes[i] &= filter2;
- i -= 1;
- while(i > 0){
- byte f = (byte) (bytes[i] & filter0);
- f >>= 6;
- bytes[i+1] |= f;
- bytes[i] <<= 2;
- bytes[i] &= filter1;
- }
- bytes[i+1] |= ConvertSymbol(c);
- }
+ byte[] bytes = new byte[byteNum + 1];
+ bytes[0] = (byte) k;
- private void SplitReads(byte[] array) {
- try {
- byte[] bytes=null;
-
- byte pre = 0, next = 0;
- byte r;
+ byte l = 0;
+ int count = 0;
+ int bcount = 0;
- for (int i = 0; i < array.length - k + 1; i++) {
- if (0 == i) {
- bytes = CompressKmer(array, i);
- } else {
- MoveKmer(bytes, array[i + k - 1]);
- /*l <<= 2;
- l &= window;
- l |= ConvertSymbol(array[i + k - 1]);*/
- pre = GetBitmap(array[i - 1]);
- }
- if (i + k != array.length) {
- next = GetBitmap(array[i + k]);
- }
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ count += 2;
+ if (count % 8 == 0) {
+ bcount += 1;
+ bytes[bcount] = l;
+ count = 0;
+ }
+ }
+ bytes[bcount + 1] = l;
+ return bytes;
+ }
- r = 0;
- r |= pre;
- r <<= 4;
- r |= next;
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
- /*System.out.print(l);
- System.out.print(' ');
- System.out.print(r);
- System.out.println();*/
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
- tupleBuilder.reset();
+ void MoveKmer(byte[] bytes, byte c) {
+ byte filter0 = (byte) 0xC0;
+ byte filter1 = (byte) 0xFC;
+ byte filter2 = 0;
- //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
- tupleBuilder.addField(bytes, 0, byteNum + 1);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
-
-
- //int[] a = tupleBuilder.getFieldEndOffsets();
- //int b = tupleBuilder.getSize();
- //byte[] c = tupleBuilder.getByteArray();
+ int r = byteNum * 8 - 2 * k;
+ r = 8 - r;
+ for (int i = 0; i < r; i++) {
+ filter2 <<= 1;
+ filter2 |= 1;
+ }
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
- 0, tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
+ int i = byteNum;
+ bytes[i] <<= 2;
+ bytes[i] &= filter2;
+ i -= 1;
+ while (i > 0) {
+ byte f = (byte) (bytes[i] & filter0);
+ f >>= 6;
+ bytes[i + 1] |= f;
+ bytes[i] <<= 2;
+ bytes[i] &= filter1;
+ }
+ bytes[i + 1] |= ConvertSymbol(c);
+ }
- }
+ private void SplitReads(byte[] array) {
+ try {
+ byte[] bytes = null;
+
+ byte pre = 0, next = 0;
+ byte r;
+
+ for (int i = 0; i < array.length - k + 1; i++) {
+ if (0 == i) {
+ bytes = CompressKmer(array, i);
+ } else {
+ MoveKmer(bytes, array[i + k - 1]);
+ /*
+ * l <<= 2; l &= window; l |= ConvertSymbol(array[i
+ * + k - 1]);
+ */
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
+
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
+
+ /*
+ * System.out.print(l); System.out.print(' ');
+ * System.out.print(r); System.out.println();
+ */
+
+ tupleBuilder.reset();
+
+ // tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,
+ // l);
+ tupleBuilder.addField(bytes, 0, byteNum + 1);
+ tupleBuilder.addField(
+ ByteSerializerDeserializer.INSTANCE, r);
+
+ // int[] a = tupleBuilder.getFieldEndOffsets();
+ // int b = tupleBuilder.getSize();
+ // byte[] c = tupleBuilder.getByteArray();
+
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
index abcb54e..dee3a72 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/GenKmerDescriptor.java
@@ -35,125 +35,145 @@
public class GenKmerDescriptor extends AbstractOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- private static final int SPLIT_ACTIVITY_ID = 0;
- private static final int MERGE_ACTIVITY_ID = 1;
- private final int framesLimit;
- private final int k;
+ private static final long serialVersionUID = 1L;
+ private static final int SPLIT_ACTIVITY_ID = 0;
+ private static final int MERGE_ACTIVITY_ID = 1;
+ private final int framesLimit;
+ private final int k;
- public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int k) {
- super(spec, 1, 1);
- this.framesLimit = framesLimit;
- this.k = k;
+ public GenKmerDescriptor(IOperatorDescriptorRegistry spec, int framesLimit,
+ int k) {
+ super(spec, 1, 1);
+ this.framesLimit = framesLimit;
+ this.k = k;
- // TODO Auto-generated constructor stub
- recordDescriptors[0] = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
- }
+ // TODO Auto-generated constructor stub
+ recordDescriptors[0] = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+ }
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- // TODO Auto-generated method stub
- SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
- MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- builder.addActivity(this, sa);
- builder.addSourceEdge(0, sa, 0);
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ // TODO Auto-generated method stub
+ SplitActivity sa = new SplitActivity(new ActivityId(odId,
+ SPLIT_ACTIVITY_ID));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId,
+ MERGE_ACTIVITY_ID));
+ builder.addActivity(this, sa);
+ builder.addSourceEdge(0, sa, 0);
- builder.addActivity(this, ma);
- builder.addTargetEdge(0, ma, 0);
+ builder.addActivity(this, ma);
+ builder.addTargetEdge(0, ma, 0);
- builder.addBlockingEdge(sa, ma);
- }
+ builder.addBlockingEdge(sa, ma);
+ }
- private class SplitActivity extends AbstractActivityNode {
- /**
+ private class SplitActivity extends AbstractActivityNode {
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public SplitActivity(ActivityId activityID) {
- super(activityID);
- // TODO Auto-generated constructor stub
- }
+ public SplitActivity(ActivityId activityID) {
+ super(activityID);
+ // TODO Auto-generated constructor stub
+ }
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- // TODO Auto-generated method stub
- //IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size
- KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(ctx, k, new RecordDescriptor(
- new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }), framesLimit,
- new TaskId(this.id, partition));
- return op;
- }
- }
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ // IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int
+ // buffer_size
+ KmerSplitOperatorNodePushable op = new KmerSplitOperatorNodePushable(
+ ctx,
+ k,
+ new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE }),
+ framesLimit, new TaskId(this.id, partition));
+ return op;
+ }
+ }
- public static class SplitTaskState extends AbstractStateObject {
- List<IFrameReader> runs;
+ public static class SplitTaskState extends AbstractStateObject {
+ List<IFrameReader> runs;
- public SplitTaskState() {
- }
+ public SplitTaskState() {
+ }
- public SplitTaskState(JobId jobId, TaskId taskId, List<IFrameReader> runs) {
- super(jobId, taskId);
- this.runs = runs;
- }
+ public SplitTaskState(JobId jobId, TaskId taskId,
+ List<IFrameReader> runs) {
+ super(jobId, taskId);
+ this.runs = runs;
+ }
- @Override
- public void toBytes(DataOutput out) throws IOException {
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
- }
+ }
- @Override
- public void fromBytes(DataInput in) throws IOException {
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
- }
- }
+ }
+ }
- private class MergeActivity extends AbstractActivityNode {
+ private class MergeActivity extends AbstractActivityNode {
- /**
+ /**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- public MergeActivity(ActivityId id) {
- super(id);
- // TODO Auto-generated constructor stub
- }
+ public MergeActivity(ActivityId id) {
+ super(id);
+ // TODO Auto-generated constructor stub
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
- // TODO Auto-generated method stub
- IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- SplitTaskState state = (SplitTaskState) ctx.getStateObject(new TaskId(new ActivityId(
- getOperatorId(), SPLIT_ACTIVITY_ID), partition));
- //List<IFrameReader> runs = runs = new LinkedList<IFrameReader>();;
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider,
+ final int partition, int nPartitions)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ SplitTaskState state = (SplitTaskState) ctx
+ .getStateObject(new TaskId(new ActivityId(
+ getOperatorId(), SPLIT_ACTIVITY_ID),
+ partition));
+ // List<IFrameReader> runs = runs = new
+ // LinkedList<IFrameReader>();;
- IBinaryComparator[] comparators = new IBinaryComparator[1];
- IBinaryComparatorFactory cf = PointableBinaryComparatorFactory.of(LongPointable.FACTORY);
- comparators[0] = cf.createBinaryComparator();
+ IBinaryComparator[] comparators = new IBinaryComparator[1];
+ IBinaryComparatorFactory cf = PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY);
+ comparators[0] = cf.createBinaryComparator();
- //int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
+ // int necessaryFrames = Math.min(runs.size() + 2,
+ // framesLimit);
- FrameSorter frameSorter = new FrameSorter(
- ctx,
- new int[] { 0 },
- new Integer64NormalizedKeyComputerFactory(),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },
- recordDescriptors[0]);
+ FrameSorter frameSorter = new FrameSorter(
+ ctx,
+ new int[] { 0 },
+ new Integer64NormalizedKeyComputerFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) },
+ recordDescriptors[0]);
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, state.runs,
- new int[] { 0 }, comparators, recordDescriptors[0], framesLimit, writer);
- merger.process();
- }
- };
- return op;
- }
- }
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(
+ ctx, frameSorter, state.runs, new int[] { 0 },
+ comparators, recordDescriptors[0], framesLimit,
+ writer);
+ merger.process();
+ }
+ };
+ return op;
+ }
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
index 31e6b85..aa7dba4 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KMerWriterFactory.java
@@ -8,30 +8,31 @@
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
public class KMerWriterFactory implements ITupleWriterFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public ITupleWriter getTupleWriter() {
- return new ITupleWriter() {
- byte newLine = "\n".getBytes()[0];
+ @Override
+ public ITupleWriter getTupleWriter() {
+ return new ITupleWriter() {
+ byte newLine = "\n".getBytes()[0];
- @Override
- public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
- try{
- for(int i = 0 ; i < 3 ; i++){
- byte[] data = tuple.getFieldData(0);
- int start = tuple.getFieldStart(0);
- int len = tuple.getFieldLength(0);
- output.write(data, start, len);
- output.writeChars(" ");
- }
- output.writeByte(newLine);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void write(DataOutput output, ITupleReference tuple)
+ throws HyracksDataException {
+ try {
+ for (int i = 0; i < 3; i++) {
+ byte[] data = tuple.getFieldData(0);
+ int start = tuple.getFieldStart(0);
+ int len = tuple.getFieldLength(0);
+ output.write(data, start, len);
+ output.writeChars(" ");
+ }
+ output.writeByte(newLine);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- };
- }
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
index 1f6731d..efe054b 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/KmerSplitOperatorNodePushable.java
@@ -22,233 +22,238 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-public class KmerSplitOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+public class KmerSplitOperatorNodePushable extends
+ AbstractUnaryInputSinkOperatorNodePushable {
- private final int k;
- private long window;
+ private final int k;
+ private long window;
- private final SplitFrame frameSorter;
+ private final SplitFrame frameSorter;
- private FrameTupleAccessor accessor;
- private ArrayTupleBuilder tupleBuilder;
- private TaskId MytaskId;
- private IHyracksTaskContext ctx;
+ private FrameTupleAccessor accessor;
+ private ArrayTupleBuilder tupleBuilder;
+ private TaskId MytaskId;
+ private IHyracksTaskContext ctx;
- public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k, RecordDescriptor rd_in, int buffer_size,
- TaskId taskid) {
+ public KmerSplitOperatorNodePushable(IHyracksTaskContext ctx, int k,
+ RecordDescriptor rd_in, int buffer_size, TaskId taskid) {
- tupleBuilder = new ArrayTupleBuilder(3);
- this.k = k;
+ tupleBuilder = new ArrayTupleBuilder(3);
+ this.k = k;
- RecordDescriptor rd = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor rd = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
- int[] sortFields = { 0 };
- frameSorter = new SplitFrame(ctx, sortFields, new Integer64NormalizedKeyComputerFactory(),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) }, rd,
- buffer_size);
+ int[] sortFields = { 0 };
+ frameSorter = new SplitFrame(
+ ctx,
+ sortFields,
+ new Integer64NormalizedKeyComputerFactory(),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(LongPointable.FACTORY) }, rd, buffer_size);
- accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), rd_in);
- new FrameTupleAccessor(ctx.getFrameSize(), rd);
- new FrameTupleAppender(ctx.getFrameSize());
+ new FrameTupleAccessor(ctx.getFrameSize(), rd);
+ new FrameTupleAppender(ctx.getFrameSize());
- ByteBuffer.allocate(ctx.getFrameSize());
+ ByteBuffer.allocate(ctx.getFrameSize());
- //initialize the window
- window = 0;
- for (int i = 0; i < k; i++) {
- window <<= 2;
- window |= 3;
- }
+ // initialize the window
+ window = 0;
+ for (int i = 0; i < k; i++) {
+ window <<= 2;
+ window |= 3;
+ }
- MytaskId = taskid;
- this.ctx = ctx;
- }
+ MytaskId = taskid;
+ this.ctx = ctx;
+ }
- @Override
- public void open() throws HyracksDataException {
- // TODO Auto-generated method stub
- //writer.open();
+ @Override
+ public void open() throws HyracksDataException {
+ // TODO Auto-generated method stub
+ // writer.open();
- }
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- ByteBuffer temp_buf = accessor.getBuffer();
- for (int i = 0; i < tupleCount; i++) {
- int tupleStartOffset = accessor.getTupleStartOffset(i);
- int fieldStartOffset = accessor.getFieldStartOffset(i, 0);
- int loadLength = accessor.getFieldLength(i, 0);
- //int loadLength = temp_buf.getInt(tupleStartOffset);
- byte[] read = new byte[loadLength];
- int slotLength = accessor.getFieldSlotsLength();
- //temp_buf.position(tupleStartOffset+fieldStartOffset + accessor.getFieldSlotsLength());
- int pos = tupleStartOffset + fieldStartOffset + slotLength;
- //temp_buf
- try {
- temp_buf.position(pos);
- temp_buf.get(read, 0, loadLength);
- SplitReads(read);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ ByteBuffer temp_buf = accessor.getBuffer();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleStartOffset = accessor.getTupleStartOffset(i);
+ int fieldStartOffset = accessor.getFieldStartOffset(i, 0);
+ int loadLength = accessor.getFieldLength(i, 0);
+ // int loadLength = temp_buf.getInt(tupleStartOffset);
+ byte[] read = new byte[loadLength];
+ int slotLength = accessor.getFieldSlotsLength();
+ // temp_buf.position(tupleStartOffset+fieldStartOffset +
+ // accessor.getFieldSlotsLength());
+ int pos = tupleStartOffset + fieldStartOffset + slotLength;
+ // temp_buf
+ try {
+ temp_buf.position(pos);
+ temp_buf.get(read, 0, loadLength);
+ SplitReads(read);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- tupleBuilder.reset();
+ tupleBuilder.reset();
- }
- }
+ }
+ }
- private long CompressKmer(byte[] array, int start, int k) {
- // a: 00; c: 01; G: 10; T: 11
- long l = 0;
- for (int i = start; i < start + k; i++) {
- l <<= 2;
- switch (array[start + i]) {
- case 'A':
- case 'a':
- l |= 0;
- break;
- case 'C':
- case 'c':
- l |= 1;
- break;
- case 'G':
- case 'g':
- l |= 2;
- break;
- case 'T':
- case 't':
- l |= 3;
- break;
- }
- }
- return l;
- }
+ private long CompressKmer(byte[] array, int start, int k) {
+ // a: 00; c: 01; G: 10; T: 11
+ long l = 0;
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[start + i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ }
+ return l;
+ }
- private byte GetBitmap(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1;
- break;
- case 'C':
- case 'c':
- r = 2;
- break;
- case 'G':
- case 'g':
- r = 4;
- break;
- case 'T':
- case 't':
- r = 8;
- break;
- }
- return r;
- }
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
- private byte ConvertSymbol(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 0;
- break;
- case 'C':
- case 'c':
- r = 1;
- break;
- case 'G':
- case 'g':
- r = 2;
- break;
- case 'T':
- case 't':
- r = 3;
- break;
- }
- return r;
- }
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
- private void SplitReads(byte[] array) {
- try {
- long l = 0;
+ private void SplitReads(byte[] array) {
+ try {
+ long l = 0;
- byte pre = 0, next = 0;
- byte r;
+ byte pre = 0, next = 0;
+ byte r;
- for (int i = 2; i < array.length - k + 1; i++) {
- if (2 == i) {
- l = CompressKmer(array, i, k);
- } else {
- l <<= 2;
- l &= window;
- l |= ConvertSymbol(array[i + k - 1]);
- pre = GetBitmap(array[i - 1]);
- }
- if (i + k != array.length) {
- next = GetBitmap(array[i + k]);
- }
+ for (int i = 2; i < array.length - k + 1; i++) {
+ if (2 == i) {
+ l = CompressKmer(array, i, k);
+ } else {
+ l <<= 2;
+ l &= window;
+ l |= ConvertSymbol(array[i + k - 1]);
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
- r = 0;
- r |= pre;
- r <<= 4;
- r |= next;
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
- /*System.out.print(l);
- System.out.print(' ');
- System.out.print(r);
- System.out.println();*/
+ /*
+ * System.out.print(l); System.out.print(' ');
+ * System.out.print(r); System.out.println();
+ */
- frameSorter.insertKmer(l, r);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ frameSorter.insertKmer(l, r);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- // TODO Auto-generated method stub
+ @Override
+ public void fail() throws HyracksDataException {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
- try {
- frameSorter.processLastFrame();
- SplitTaskState state = new SplitTaskState(ctx.getJobletContext().getJobId(), MytaskId,
- frameSorter.GetRuns());
- ctx.setStateObject(state);
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+ try {
+ frameSorter.processLastFrame();
+ SplitTaskState state = new SplitTaskState(ctx.getJobletContext()
+ .getJobId(), MytaskId, frameSorter.GetRuns());
+ ctx.setStateObject(state);
- //writer.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ // writer.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
- public List<IFrameReader> GetRuns() {
- return frameSorter.GetRuns();
- }
+ public List<IFrameReader> GetRuns() {
+ return frameSorter.GetRuns();
+ }
- //for debug
- /* private void DumpBlock(ByteBuffer f){
-
- int n = f.array().length/13;
-
- for(int i = 0 ; i < n ; i++){
- long t = LongPointable.getLong(f.array(), 13 * i);
- System.out.print(t);
- System.out.print(' ');
- }
- System.out.println();
- }*/
+ // for debug
+ /*
+ * private void DumpBlock(ByteBuffer f){
+ *
+ * int n = f.array().length/13;
+ *
+ * for(int i = 0 ; i < n ; i++){ long t = LongPointable.getLong(f.array(),
+ * 13 * i); System.out.print(t); System.out.print(' '); }
+ * System.out.println(); }
+ */
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index fb0fc18..9070a35 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -7,7 +7,6 @@
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -20,149 +19,133 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-public class PrinterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private String filename;
- private boolean writeFile;
- private BufferedWriter twriter;
- private FileOutputStream stream;
+public class PrinterOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private String filename;
+ private boolean writeFile;
+ private BufferedWriter twriter;
+ private FileOutputStream stream;
- /**
- * The constructor of HDFSWriteOperatorDescriptor.
- *
- * @param spec
- * the JobSpecification object
- * @param conf
- * the Hadoop JobConf which contains the output path
- * @param tupleWriterFactory
- * the ITupleWriterFactory implementation object
- * @throws HyracksException
- */
- public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
- super(spec, 1, 0);
- writeFile = false;
- }
-
- public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec, String filename) {
- super(spec, 1, 0);
- this.filename = filename;
- writeFile = true;
- }
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
+ super(spec, 1, 0);
+ writeFile = false;
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ String filename) {
+ super(spec, 1, 0);
+ this.filename = filename;
+ writeFile = true;
+ }
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
- private FrameTupleReference tuple = new FrameTupleReference();
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
+ final int partition, final int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor inputRd = recordDescProvider
+ .getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
- @Override
- public void open() throws HyracksDataException {
- if( true == writeFile){
- try {
- filename = filename + String.valueOf(partition)+".txt";
- //System.err.println(filename);
- stream = new FileOutputStream(filename);
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- twriter = new BufferedWriter(new OutputStreamWriter(stream));
- }
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (true == writeFile) {
+ try {
+ filename = filename + String.valueOf(partition)
+ + ".txt";
+ // System.err.println(filename);
+ stream = new FileOutputStream(filename);
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ twriter = new BufferedWriter(new OutputStreamWriter(stream));
+ }
+ }
-
- private void PrintBytes(int no){
- try{
-
- byte[] bytes = tuple.getFieldData(no);
- int offset = tuple.getFieldStart(no);
- int length = tuple.getFieldLength(no);
- if(true == writeFile){
- for(int j = offset ; j < offset + length ; j++){
- twriter.write(String.valueOf((int)bytes[j]));
- twriter.write(" ");
- }
- twriter.write("&&");
- }
- else{
- for(int j = offset ; j < offset + length ;j++){
- System.err.print(String.valueOf((int)bytes[j]));
- System.err.print(" ");
- }
- System.err.print("&&");
- }
- }
- catch(IOException e){
- e.printStackTrace();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try{
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- int tj = tuple.getFieldCount();
- for(int j = 0 ; j< tj ; j++){
- PrintBytes(j);
- }
- if(true == writeFile){
- twriter.write("\n");
- }
- else{
- System.err.println();
- }
- }
- }
- catch(IOException e){
- e.printStackTrace();
- }
- }
+ private void PrintBytes(int no) {
+ try {
- @Override
- public void fail() throws HyracksDataException {
+ byte[] bytes = tuple.getFieldData(no);
+ int offset = tuple.getFieldStart(no);
+ int length = tuple.getFieldLength(no);
+ if (true == writeFile) {
+ for (int j = offset; j < offset + length; j++) {
+ twriter.write(String.valueOf((int) bytes[j]));
+ twriter.write(" ");
+ }
+ twriter.write("&&");
+ } else {
+ for (int j = offset; j < offset + length; j++) {
+ System.err.print(String.valueOf((int) bytes[j]));
+ System.err.print(" ");
+ }
+ System.err.print("&&");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ try {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ int tj = tuple.getFieldCount();
+ for (int j = 0; j < tj; j++) {
+ PrintBytes(j);
+ }
+ if (true == writeFile) {
+ twriter.write("\n");
+ } else {
+ System.err.println();
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
- @Override
- public void close() throws HyracksDataException {
- if( true == writeFile){
- try {
- twriter.close();
- stream.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
+ @Override
+ public void fail() throws HyracksDataException {
- };
- }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (true == writeFile) {
+ try {
+ twriter.close();
+ stream.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ };
+ }
}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
index 2d7ee70..60eaa35 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/ReadsKeyValueParserFactory.java
@@ -17,216 +17,225 @@
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
-public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
- private static final long serialVersionUID = 1L;
+public class ReadsKeyValueParserFactory implements
+ IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
- private int k;
- private int byteNum;
-
- public ReadsKeyValueParserFactory(int k){
- this.k = k;
- byteNum = (byte)Math.ceil((double)k/4.0);
- }
- @Override
- public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
-;
-
- final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
- final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
+ private int k;
+ private int byteNum;
- return new IKeyValueParser<LongWritable, Text>() {
+ public ReadsKeyValueParserFactory(int k) {
+ this.k = k;
+ byteNum = (byte) Math.ceil((double) k / 4.0);
+ }
- @Override
- public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
- String geneLine = value.toString(); // Read the Real Gene Line
- Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(geneLine);
- boolean isValid = geneMatcher.matches();
- if(isValid){
- SplitReads(geneLine.getBytes(), writer);
- }
- }
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(
+ final IHyracksTaskContext ctx) {
+ ;
- @Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
-
-
- private byte[] CompressKmer(byte[] array, int start) {
- // a: 00; c: 01; G: 10; T: 11
-
- byte[] bytes = new byte[byteNum+1];
- bytes[0] = (byte) k;
-
- byte l = 0;
- int count = 0;
- int bcount = 0;
-
- for (int i = start; i < start + k; i++) {
- l <<= 2;
- switch (array[i]) {
- case 'A':
- case 'a':
- l |= 0;
- break;
- case 'C':
- case 'c':
- l |= 1;
- break;
- case 'G':
- case 'g':
- l |= 2;
- break;
- case 'T':
- case 't':
- l |= 3;
- break;
- }
- count += 2;
- if(count%8==0){
- bcount += 1;
- bytes[bcount] = l;
- count = 0;
- }
- }
- bytes[bcount + 1] = l;
- return bytes;
- }
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(
+ ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
- private byte GetBitmap(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 1;
- break;
- case 'C':
- case 'c':
- r = 2;
- break;
- case 'G':
- case 'g':
- r = 4;
- break;
- case 'T':
- case 't':
- r = 8;
- break;
- }
- return r;
- }
+ return new IKeyValueParser<LongWritable, Text>() {
- private byte ConvertSymbol(byte t) {
- byte r = 0;
- switch (t) {
- case 'A':
- case 'a':
- r = 0;
- break;
- case 'C':
- case 'c':
- r = 1;
- break;
- case 'G':
- case 'g':
- r = 2;
- break;
- case 'T':
- case 't':
- r = 3;
- break;
- }
- return r;
- }
-
- void MoveKmer(byte[] bytes, byte c){
- byte filter0 = (byte) 0xC0;
- byte filter1 = (byte) 0xFC;
- byte filter2 = 0;
-
- int r = byteNum*8 - 2*k;
- r = 8 - r;
- for(int i = 0 ; i < r ; i++){
- filter2 <<= 1;
- filter2 |= 1;
- }
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer)
+ throws HyracksDataException {
+ String geneLine = value.toString(); // Read the Real Gene Line
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine);
+ boolean isValid = geneMatcher.matches();
+ if (isValid) {
+ SplitReads(geneLine.getBytes(), writer);
+ }
+ }
- int i = byteNum;
- bytes[i] <<= 2;
- bytes[i] &= filter2;
- i -= 1;
- while(i > 0){
- byte f = (byte) (bytes[i] & filter0);
- f >>= 6;
- bytes[i+1] |= f;
- bytes[i] <<= 2;
- bytes[i] &= filter1;
- }
- bytes[i+1] |= ConvertSymbol(c);
- }
+ @Override
+ public void flush(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
- private void SplitReads(byte[] array, IFrameWriter writer) {
- try {
- byte[] bytes=null;
-
- byte pre = 0, next = 0;
- byte r;
+ private byte[] CompressKmer(byte[] array, int start) {
+ // a: 00; c: 01; G: 10; T: 11
- for (int i = 0; i < array.length - k + 1; i++) {
- if (0 == i) {
- bytes = CompressKmer(array, i);
- } else {
- MoveKmer(bytes, array[i + k - 1]);
- /*l <<= 2;
- l &= window;
- l |= ConvertSymbol(array[i + k - 1]);*/
- pre = GetBitmap(array[i - 1]);
- }
- if (i + k != array.length) {
- next = GetBitmap(array[i + k]);
- }
+ byte[] bytes = new byte[byteNum + 1];
+ bytes[0] = (byte) k;
- r = 0;
- r |= pre;
- r <<= 4;
- r |= next;
+ byte l = 0;
+ int count = 0;
+ int bcount = 0;
- /*System.out.print(l);
- System.out.print(' ');
- System.out.print(r);
- System.out.println();*/
+ for (int i = start; i < start + k; i++) {
+ l <<= 2;
+ switch (array[i]) {
+ case 'A':
+ case 'a':
+ l |= 0;
+ break;
+ case 'C':
+ case 'c':
+ l |= 1;
+ break;
+ case 'G':
+ case 'g':
+ l |= 2;
+ break;
+ case 'T':
+ case 't':
+ l |= 3;
+ break;
+ }
+ count += 2;
+ if (count % 8 == 0) {
+ bcount += 1;
+ bytes[bcount] = l;
+ count = 0;
+ }
+ }
+ bytes[bcount + 1] = l;
+ return bytes;
+ }
- tupleBuilder.reset();
+ private byte GetBitmap(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 1;
+ break;
+ case 'C':
+ case 'c':
+ r = 2;
+ break;
+ case 'G':
+ case 'g':
+ r = 4;
+ break;
+ case 'T':
+ case 't':
+ r = 8;
+ break;
+ }
+ return r;
+ }
- //tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
- tupleBuilder.addField(bytes, 0, byteNum + 1);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
-
-
- //int[] a = tupleBuilder.getFieldEndOffsets();
- //int b = tupleBuilder.getSize();
- //byte[] c = tupleBuilder.getByteArray();
+ private byte ConvertSymbol(byte t) {
+ byte r = 0;
+ switch (t) {
+ case 'A':
+ case 'a':
+ r = 0;
+ break;
+ case 'C':
+ case 'c':
+ r = 1;
+ break;
+ case 'G':
+ case 'g':
+ r = 2;
+ break;
+ case 'T':
+ case 't':
+ r = 3;
+ break;
+ }
+ return r;
+ }
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(),
- 0, tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
- }
+ void MoveKmer(byte[] bytes, byte c) {
+ byte filter0 = (byte) 0xC0;
+ byte filter1 = (byte) 0xFC;
+ byte filter2 = 0;
+
+ int r = byteNum * 8 - 2 * k;
+ r = 8 - r;
+ for (int i = 0; i < r; i++) {
+ filter2 <<= 1;
+ filter2 |= 1;
+ }
+
+ int i = byteNum;
+ bytes[i] <<= 2;
+ bytes[i] &= filter2;
+ i -= 1;
+ while (i > 0) {
+ byte f = (byte) (bytes[i] & filter0);
+ f >>= 6;
+ bytes[i + 1] |= f;
+ bytes[i] <<= 2;
+ bytes[i] &= filter1;
+ }
+ bytes[i + 1] |= ConvertSymbol(c);
+ }
+
+ private void SplitReads(byte[] array, IFrameWriter writer) {
+ try {
+ byte[] bytes = null;
+
+ byte pre = 0, next = 0;
+ byte r;
+
+ for (int i = 0; i < array.length - k + 1; i++) {
+ if (0 == i) {
+ bytes = CompressKmer(array, i);
+ } else {
+ MoveKmer(bytes, array[i + k - 1]);
+ /*
+ * l <<= 2; l &= window; l |= ConvertSymbol(array[i
+ * + k - 1]);
+ */
+ pre = GetBitmap(array[i - 1]);
+ }
+ if (i + k != array.length) {
+ next = GetBitmap(array[i + k]);
+ }
+
+ r = 0;
+ r |= pre;
+ r <<= 4;
+ r |= next;
+
+ /*
+ * System.out.print(l); System.out.print(' ');
+ * System.out.print(r); System.out.println();
+ */
+
+ tupleBuilder.reset();
+
+ // tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,
+ // l);
+ tupleBuilder.addField(bytes, 0, byteNum + 1);
+ tupleBuilder.addField(
+ ByteSerializerDeserializer.INSTANCE, r);
+
+ // int[] a = tupleBuilder.getFieldEndOffsets();
+ // int b = tupleBuilder.getSize();
+ // byte[] c = tupleBuilder.getByteArray();
+
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
index 179c983..0c8bd07 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/SplitFrame.java
@@ -42,443 +42,456 @@
public class SplitFrame {
- private static int HASH_SIZE = 4096;
- private final SerializableHashTable table;
- private final TuplePointer tempPointer;
- private ArrayTupleBuilder tupleBuilder;
- private final int buf_size;
+ private static int HASH_SIZE = 4096;
+ private final SerializableHashTable table;
+ private final TuplePointer tempPointer;
+ private ArrayTupleBuilder tupleBuilder;
+ private final int buf_size;
- private final IHyracksTaskContext ctx;
- private final int[] sortFields;
- private final INormalizedKeyComputer nkc;
- private final IBinaryComparator[] comparators;
- private final ByteBuffer[] buffers;
+ private final IHyracksTaskContext ctx;
+ private final int[] sortFields;
+ private final INormalizedKeyComputer nkc;
+ private final IBinaryComparator[] comparators;
+ private final ByteBuffer[] buffers;
- private final FrameTupleAccessor fta1;
- private final FrameTupleAccessor fta2;
+ private final FrameTupleAccessor fta1;
+ private final FrameTupleAccessor fta2;
- private final FrameTupleAppender appender;
+ private final FrameTupleAppender appender;
- private final ByteBuffer outFrame;
+ private final ByteBuffer outFrame;
- private int dataFrameCount;
- private int[] tPointers;
- private int tupleCount;
- private final List<IFrameReader> runs;
- private int flushCount;
- private RecordDescriptor recordDescriptor;
+ private int dataFrameCount;
+ private int[] tPointers;
+ private int tupleCount;
+ private final List<IFrameReader> runs;
+ private int flushCount;
+ private RecordDescriptor recordDescriptor;
- private int FrameTupleCount;
+ private int FrameTupleCount;
- public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,
- INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, int buf_size) {
- this.ctx = ctx;
- this.sortFields = sortFields;
- this.recordDescriptor = recordDescriptor;
+ public SplitFrame(IHyracksTaskContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+ IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int buf_size) {
+ this.ctx = ctx;
+ this.sortFields = sortFields;
+ this.recordDescriptor = recordDescriptor;
- nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
- comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
- fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
- appender = new FrameTupleAppender(ctx.getFrameSize());
- outFrame = ctx.allocateFrame();
- table = new SerializableHashTable(HASH_SIZE, ctx);
- dataFrameCount = 0;
+ nkc = firstKeyNormalizerFactory == null ? null
+ : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrame = ctx.allocateFrame();
+ table = new SerializableHashTable(HASH_SIZE, ctx);
+ dataFrameCount = 0;
- tempPointer = new TuplePointer();
- tupleBuilder = new ArrayTupleBuilder(3);
- this.buf_size = buf_size;
- buffers = new ByteBuffer[buf_size];
- for (int i = 0; i < buf_size; i++) {
- buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());
- }
- appender.reset(buffers[0], true);
- flushCount = 0;
- runs = new LinkedList<IFrameReader>();
- FrameTupleCount = 0;
- }
+ tempPointer = new TuplePointer();
+ tupleBuilder = new ArrayTupleBuilder(3);
+ this.buf_size = buf_size;
+ buffers = new ByteBuffer[buf_size];
+ for (int i = 0; i < buf_size; i++) {
+ buffers[i] = ByteBuffer.allocate(ctx.getFrameSize());
+ }
+ appender.reset(buffers[0], true);
+ flushCount = 0;
+ runs = new LinkedList<IFrameReader>();
+ FrameTupleCount = 0;
+ }
- public void reset() {
- dataFrameCount = 0;
- tupleCount = 0;
- appender.reset(buffers[0], true);
- }
+ public void reset() {
+ dataFrameCount = 0;
+ tupleCount = 0;
+ appender.reset(buffers[0], true);
+ }
- public int getFrameCount() {
- return dataFrameCount;
- }
+ public int getFrameCount() {
+ return dataFrameCount;
+ }
- private void SearchHashTable(long entry, TuplePointer dataPointer) {
- int offset = 0;
- int tp = (int) (entry % HASH_SIZE);
- if (tp < 0) {
- tp = -tp;
- }
- do {
- table.getTuplePointer(tp, offset, dataPointer);// what is the offset mean?
- if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)
- break;
- int bIndex = dataPointer.frameIndex;
- int tIndex = dataPointer.tupleIndex;
- fta1.reset(buffers[bIndex]);
+ private void SearchHashTable(long entry, TuplePointer dataPointer) {
+ int offset = 0;
+ int tp = (int) (entry % HASH_SIZE);
+ if (tp < 0) {
+ tp = -tp;
+ }
+ do {
+ table.getTuplePointer(tp, offset, dataPointer);// what is the offset
+ // mean?
+ if (dataPointer.frameIndex < 0 || dataPointer.tupleIndex < 0)
+ break;
+ int bIndex = dataPointer.frameIndex;
+ int tIndex = dataPointer.tupleIndex;
+ fta1.reset(buffers[bIndex]);
- /* System.out.print("a:");
- System.out.print(tIndex);
- System.out.print(" b");
- System.out.print(fta1.getTupleCount());
- System.out.println();*/
+ /*
+ * System.out.print("a:"); System.out.print(tIndex);
+ * System.out.print(" b"); System.out.print(fta1.getTupleCount());
+ * System.out.println();
+ */
- int tupleOffset = fta1.getTupleStartOffset(tIndex);
- int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);
- int slotLength = fta1.getFieldSlotsLength();
- int pos = tupleOffset + fieldOffset + slotLength;
- long l = buffers[bIndex].getLong(pos);
- if (l == entry) {
- break;
- }
- offset += 1;
- } while (true);
- }
+ int tupleOffset = fta1.getTupleStartOffset(tIndex);
+ int fieldOffset = fta1.getFieldStartOffset(tIndex, 0);
+ int slotLength = fta1.getFieldSlotsLength();
+ int pos = tupleOffset + fieldOffset + slotLength;
+ long l = buffers[bIndex].getLong(pos);
+ if (l == entry) {
+ break;
+ }
+ offset += 1;
+ } while (true);
+ }
- private void InsertHashTable(long entry, int frame_id, int tuple_id) {
+ private void InsertHashTable(long entry, int frame_id, int tuple_id) {
- tempPointer.frameIndex = frame_id;
- tempPointer.tupleIndex = tuple_id;
+ tempPointer.frameIndex = frame_id;
+ tempPointer.tupleIndex = tuple_id;
- //System.out.print(frame_id);
- //System.out.print(' ');
- //System.out.println(tuple_id);
+ // System.out.print(frame_id);
+ // System.out.print(' ');
+ // System.out.println(tuple_id);
- int tp = (int) (entry % HASH_SIZE);
- if (tp < 0) {
- tp = -tp;
- }
- table.insert(tp, tempPointer);
+ int tp = (int) (entry % HASH_SIZE);
+ if (tp < 0) {
+ tp = -tp;
+ }
+ table.insert(tp, tempPointer);
- }
+ }
- public void insertKmer(long l, byte r) {
- try {
- SearchHashTable(l, tempPointer);
- if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {
- fta1.reset(buffers[tempPointer.frameIndex]);
- int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);
- int f0StartRel = fta1.getFieldStartOffset(tempPointer.tupleIndex, 1);
- int slotLength = fta1.getFieldSlotsLength();
- int pos = f0StartRel + tStart + slotLength;
+ public void insertKmer(long l, byte r) {
+ try {
+ SearchHashTable(l, tempPointer);
+ if (tempPointer.frameIndex != -1 && tempPointer.tupleIndex != -1) {
+ fta1.reset(buffers[tempPointer.frameIndex]);
+ int tStart = fta1.getTupleStartOffset(tempPointer.tupleIndex);
+ int f0StartRel = fta1.getFieldStartOffset(
+ tempPointer.tupleIndex, 1);
+ int slotLength = fta1.getFieldSlotsLength();
+ int pos = f0StartRel + tStart + slotLength;
- buffers[tempPointer.frameIndex].array()[pos] |= r;
- buffers[tempPointer.frameIndex].position(pos + 1);
- int temp_int = buffers[tempPointer.frameIndex].getInt();
- temp_int += 1;
- buffers[tempPointer.frameIndex].position(pos + 1);
- buffers[tempPointer.frameIndex].putInt(temp_int);
- } else {
- tupleBuilder.reset();
- tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE, l);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
- tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, 1);
+ buffers[tempPointer.frameIndex].array()[pos] |= r;
+ buffers[tempPointer.frameIndex].position(pos + 1);
+ int temp_int = buffers[tempPointer.frameIndex].getInt();
+ temp_int += 1;
+ buffers[tempPointer.frameIndex].position(pos + 1);
+ buffers[tempPointer.frameIndex].putInt(temp_int);
+ } else {
+ tupleBuilder.reset();
+ tupleBuilder.addField(Integer64SerializerDeserializer.INSTANCE,
+ l);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, r);
+ tupleBuilder
+ .addField(IntegerSerializerDeserializer.INSTANCE, 1);
- /*System.out.print(l);
- System.out.print(' ');
- System.out.print(r);
- System.out.println();*/
- boolean b = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize());
+ /*
+ * System.out.print(l); System.out.print(' ');
+ * System.out.print(r); System.out.println();
+ */
+ boolean b = appender.append(tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
- if (!b) {
- dataFrameCount++;
- FrameTupleCount = 0;
- if (dataFrameCount < buf_size) {
- appender.reset(buffers[dataFrameCount], true);
- } else {
- sortFrames();
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- ExternalSortRunGenerator.class.getSimpleName());
- RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- try {
- flushCount += 1;
- flushFrames(writer);
- } finally {
- writer.close();
- }
- runs.add(writer.createReader());
- dataFrameCount = 0;
- appender.reset(buffers[dataFrameCount], true);
- }
- boolean tb = appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize());
- if (!tb) {
- throw new HyracksDataException(
- "Failed to copy an record into a frame: the record size is too large");
- }
- }
- InsertHashTable(l, dataFrameCount, FrameTupleCount);
- FrameTupleCount += 1;
- }
- } catch (HyracksDataException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ if (!b) {
+ dataFrameCount++;
+ FrameTupleCount = 0;
+ if (dataFrameCount < buf_size) {
+ appender.reset(buffers[dataFrameCount], true);
+ } else {
+ sortFrames();
+ FileReference file = ctx.getJobletContext()
+ .createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class
+ .getSimpleName());
+ RunFileWriter writer = new RunFileWriter(file,
+ ctx.getIOManager());
+ writer.open();
+ try {
+ flushCount += 1;
+ flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ runs.add(writer.createReader());
+ dataFrameCount = 0;
+ appender.reset(buffers[dataFrameCount], true);
+ }
+ boolean tb = appender.append(
+ tupleBuilder.getFieldEndOffsets(),
+ tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize());
+ if (!tb) {
+ throw new HyracksDataException(
+ "Failed to copy an record into a frame: the record size is too large");
+ }
+ }
+ InsertHashTable(l, dataFrameCount, FrameTupleCount);
+ FrameTupleCount += 1;
+ }
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
- public void sortFrames() {
- int nBuffers = dataFrameCount;
- tupleCount = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers[i]);
- tupleCount += fta1.getTupleCount();
- }
- int sfIdx = sortFields[0];
- tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4] : tPointers;
- int ptr = 0;
- for (int i = 0; i < nBuffers; ++i) {
- fta1.reset(buffers[i]);
- int tCount = fta1.getTupleCount();
- byte[] array = fta1.getBuffer().array();
- for (int j = 0; j < tCount; ++j) {
- int tStart = fta1.getTupleStartOffset(j);
- int tEnd = fta1.getTupleEndOffset(j);
- tPointers[ptr * 4] = i;
- tPointers[ptr * 4 + 1] = tStart;
- tPointers[ptr * 4 + 2] = tEnd;
- int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
- int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
- int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
- tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array, f0Start, f0EndRel - f0StartRel);
- ++ptr;
- }
- }
- if (tupleCount > 0) {
- sort(tPointers, 0, tupleCount);
- }
+ public void sortFrames() {
+ int nBuffers = dataFrameCount;
+ tupleCount = 0;
+ for (int i = 0; i < nBuffers; ++i) {
+ fta1.reset(buffers[i]);
+ tupleCount += fta1.getTupleCount();
+ }
+ int sfIdx = sortFields[0];
+ tPointers = tPointers == null || tPointers.length < tupleCount * 4 ? new int[tupleCount * 4]
+ : tPointers;
+ int ptr = 0;
+ for (int i = 0; i < nBuffers; ++i) {
+ fta1.reset(buffers[i]);
+ int tCount = fta1.getTupleCount();
+ byte[] array = fta1.getBuffer().array();
+ for (int j = 0; j < tCount; ++j) {
+ int tStart = fta1.getTupleStartOffset(j);
+ int tEnd = fta1.getTupleEndOffset(j);
+ tPointers[ptr * 4] = i;
+ tPointers[ptr * 4 + 1] = tStart;
+ tPointers[ptr * 4 + 2] = tEnd;
+ int f0StartRel = fta1.getFieldStartOffset(j, sfIdx);
+ int f0EndRel = fta1.getFieldEndOffset(j, sfIdx);
+ int f0Start = f0StartRel + tStart + fta1.getFieldSlotsLength();
+ tPointers[ptr * 4 + 3] = nkc == null ? 0 : nkc.normalize(array,
+ f0Start, f0EndRel - f0StartRel);
+ ++ptr;
+ }
+ }
+ if (tupleCount > 0) {
+ sort(tPointers, 0, tupleCount);
+ }
- DumpAllBuffers();
- //point the pointer to the first one
- dataFrameCount = 0;
- }
+ DumpAllBuffers();
+ // point the pointer to the first one
+ dataFrameCount = 0;
+ }
- public void flushFrames(IFrameWriter writer) throws HyracksDataException {
- appender.reset(outFrame, true);
- for (int ptr = 0; ptr < tupleCount; ++ptr) {
- int i = tPointers[ptr * 4];
- int tStart = tPointers[ptr * 4 + 1];
- int tEnd = tPointers[ptr * 4 + 2];
- ByteBuffer buffer = buffers[i];
- fta1.reset(buffer);
- if (!appender.append(fta1, tStart, tEnd)) {
- FrameUtils.flushFrame(outFrame, writer);
- appender.reset(outFrame, true);
- if (!appender.append(fta1, tStart, tEnd)) {
- throw new IllegalStateException();
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- }
- }
+ public void flushFrames(IFrameWriter writer) throws HyracksDataException {
+ appender.reset(outFrame, true);
+ for (int ptr = 0; ptr < tupleCount; ++ptr) {
+ int i = tPointers[ptr * 4];
+ int tStart = tPointers[ptr * 4 + 1];
+ int tEnd = tPointers[ptr * 4 + 2];
+ ByteBuffer buffer = buffers[i];
+ fta1.reset(buffer);
+ if (!appender.append(fta1, tStart, tEnd)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ appender.reset(outFrame, true);
+ if (!appender.append(fta1, tStart, tEnd)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
- private void sort(int[] tPointers, int offset, int length) {
- int m = offset + (length >> 1);
- int mi = tPointers[m * 4];
- int mj = tPointers[m * 4 + 1];
- int mv = tPointers[m * 4 + 3];
+ private void sort(int[] tPointers, int offset, int length) {
+ int m = offset + (length >> 1);
+ int mi = tPointers[m * 4];
+ int mj = tPointers[m * 4 + 1];
+ int mv = tPointers[m * 4 + 3];
- int a = offset;
- int b = a;
- int c = offset + length - 1;
- int d = c;
- while (true) {
- while (b <= c) {
- int cmp = compare(tPointers, b, mi, mj, mv);
- if (cmp > 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, a++, b);
- }
- ++b;
- }
- while (c >= b) {
- int cmp = compare(tPointers, c, mi, mj, mv);
- if (cmp < 0) {
- break;
- }
- if (cmp == 0) {
- swap(tPointers, c, d--);
- }
- --c;
- }
- if (b > c)
- break;
- swap(tPointers, b++, c--);
- }
+ int a = offset;
+ int b = a;
+ int c = offset + length - 1;
+ int d = c;
+ while (true) {
+ while (b <= c) {
+ int cmp = compare(tPointers, b, mi, mj, mv);
+ if (cmp > 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, a++, b);
+ }
+ ++b;
+ }
+ while (c >= b) {
+ int cmp = compare(tPointers, c, mi, mj, mv);
+ if (cmp < 0) {
+ break;
+ }
+ if (cmp == 0) {
+ swap(tPointers, c, d--);
+ }
+ --c;
+ }
+ if (b > c)
+ break;
+ swap(tPointers, b++, c--);
+ }
- int s;
- int n = offset + length;
- s = Math.min(a - offset, b - a);
- vecswap(tPointers, offset, b - s, s);
- s = Math.min(d - c, n - d - 1);
- vecswap(tPointers, b, n - s, s);
+ int s;
+ int n = offset + length;
+ s = Math.min(a - offset, b - a);
+ vecswap(tPointers, offset, b - s, s);
+ s = Math.min(d - c, n - d - 1);
+ vecswap(tPointers, b, n - s, s);
- if ((s = b - a) > 1) {
- sort(tPointers, offset, s);
- }
- if ((s = d - c) > 1) {
- sort(tPointers, n - s, s);
- }
- }
+ if ((s = b - a) > 1) {
+ sort(tPointers, offset, s);
+ }
+ if ((s = d - c) > 1) {
+ sort(tPointers, n - s, s);
+ }
+ }
- private void swap(int x[], int a, int b) {
- for (int i = 0; i < 4; ++i) {
- int t = x[a * 4 + i];
- x[a * 4 + i] = x[b * 4 + i];
- x[b * 4 + i] = t;
- }
- }
+ private void swap(int x[], int a, int b) {
+ for (int i = 0; i < 4; ++i) {
+ int t = x[a * 4 + i];
+ x[a * 4 + i] = x[b * 4 + i];
+ x[b * 4 + i] = t;
+ }
+ }
- private void vecswap(int x[], int a, int b, int n) {
- for (int i = 0; i < n; i++, a++, b++) {
- swap(x, a, b);
- }
- }
+ private void vecswap(int x[], int a, int b, int n) {
+ for (int i = 0; i < n; i++, a++, b++) {
+ swap(x, a, b);
+ }
+ }
- private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
- int i1 = tPointers[tp1 * 4];
- int j1 = tPointers[tp1 * 4 + 1];
- int v1 = tPointers[tp1 * 4 + 3];
- if (v1 != tp2v) {
- return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
- }
- int i2 = tp2i;
- int j2 = tp2j;
- ByteBuffer buf1 = buffers[i1];
- ByteBuffer buf2 = buffers[i2];
- byte[] b1 = buf1.array();
- byte[] b2 = buf2.array();
- fta1.reset(buf1);
- fta2.reset(buf2);
- for (int f = 0; f < comparators.length; ++f) {
- int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
- int f1End = buf1.getInt(j1 + fIdx * 4);
- int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
- int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
- int f2End = buf2.getInt(j2 + fIdx * 4);
- int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
- int l2 = f2End - f2Start;
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
+ private int compare(int[] tPointers, int tp1, int tp2i, int tp2j, int tp2v) {
+ int i1 = tPointers[tp1 * 4];
+ int j1 = tPointers[tp1 * 4 + 1];
+ int v1 = tPointers[tp1 * 4 + 3];
+ if (v1 != tp2v) {
+ return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1
+ : 1;
+ }
+ int i2 = tp2i;
+ int j2 = tp2j;
+ ByteBuffer buf1 = buffers[i1];
+ ByteBuffer buf2 = buffers[i2];
+ byte[] b1 = buf1.array();
+ byte[] b2 = buf2.array();
+ fta1.reset(buf1);
+ fta2.reset(buf2);
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = sortFields[f];
+ int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+ int f1End = buf1.getInt(j1 + fIdx * 4);
+ int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
+ int l1 = f1End - f1Start;
+ int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+ int f2End = buf2.getInt(j2 + fIdx * 4);
+ int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
+ int l2 = f2End - f2Start;
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
- public void close() {
- //this.buffers.clear();
- }
+ public void close() {
+ // this.buffers.clear();
+ }
- public int getFlushCount() {
- return flushCount;
- }
+ public int getFlushCount() {
+ return flushCount;
+ }
- /*public void AddRuns(RunFileWriter r){
- try {
- runs.add(r.createReader());
- } catch (HyracksDataException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }*/
+ /*
+ * public void AddRuns(RunFileWriter r){ try { runs.add(r.createReader()); }
+ * catch (HyracksDataException e) { // TODO Auto-generated catch block
+ * e.printStackTrace(); } }
+ */
- public List<IFrameReader> GetRuns() {
- return runs;
- }
+ public List<IFrameReader> GetRuns() {
+ return runs;
+ }
- private void DumpAllBuffers() {
- FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
+ private void DumpAllBuffers() {
+ FrameTupleAccessor tfa = new FrameTupleAccessor(ctx.getFrameSize(),
+ recordDescriptor);
- for (int i = 0; i < dataFrameCount; i++) {
- tfa.reset(buffers[i]);
- int t = tfa.getTupleCount();
- for (int j = 0; j < t; j++) {
- int tupleOffset = tfa.getTupleStartOffset(j);
+ for (int i = 0; i < dataFrameCount; i++) {
+ tfa.reset(buffers[i]);
+ int t = tfa.getTupleCount();
+ for (int j = 0; j < t; j++) {
+ int tupleOffset = tfa.getTupleStartOffset(j);
- int r = tfa.getFieldStartOffset(j, 0);
- int pos = tupleOffset + r + tfa.getFieldSlotsLength();
- long l = buffers[i].getLong(pos);
- System.out.print(l);
- System.out.print(' ');
+ int r = tfa.getFieldStartOffset(j, 0);
+ int pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ long l = buffers[i].getLong(pos);
+ System.out.print(l);
+ System.out.print(' ');
- r = tfa.getFieldStartOffset(j, 1);
- pos = tupleOffset + r + tfa.getFieldSlotsLength();
- byte b = buffers[i].array()[pos];
- System.out.print(b);
- System.out.print(' ');
+ r = tfa.getFieldStartOffset(j, 1);
+ pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ byte b = buffers[i].array()[pos];
+ System.out.print(b);
+ System.out.print(' ');
- r = tfa.getFieldStartOffset(j, 2);
- pos = tupleOffset + r + tfa.getFieldSlotsLength();
- int o = buffers[i].getInt(pos);
- System.out.print(o);
- System.out.print(' ');
+ r = tfa.getFieldStartOffset(j, 2);
+ pos = tupleOffset + r + tfa.getFieldSlotsLength();
+ int o = buffers[i].getInt(pos);
+ System.out.print(o);
+ System.out.print(' ');
- System.out.println();
- }
- }
- System.out.println("---------------------------------");
- }
+ System.out.println();
+ }
+ }
+ System.out.println("---------------------------------");
+ }
- //functions for dubugging
- // private void DumpBuffer(byte[] f) {
- // int n = f.length;
- //
- // int count = 0;
- // for (int i = 0; i < n; i++) {
- // if (i % 13 == 0) {
- // if (count != 0) {
- // System.out.print(")(");
- // } else {
- // System.out.print("(");
- // }
- // System.out.print(count);
- // System.out.print(':');
- // count += 1;
- // }
- // System.out.print(f[i]);
- // System.out.print(' ');
- // }
- // System.out.println(')');
- // }
+ // functions for dubugging
+ // private void DumpBuffer(byte[] f) {
+ // int n = f.length;
+ //
+ // int count = 0;
+ // for (int i = 0; i < n; i++) {
+ // if (i % 13 == 0) {
+ // if (count != 0) {
+ // System.out.print(")(");
+ // } else {
+ // System.out.print("(");
+ // }
+ // System.out.print(count);
+ // System.out.print(':');
+ // count += 1;
+ // }
+ // System.out.print(f[i]);
+ // System.out.print(' ');
+ // }
+ // System.out.println(')');
+ // }
- public void processLastFrame() {
- sortFrames();
- FileReference file;
+ public void processLastFrame() {
+ sortFrames();
+ FileReference file;
- DumpAllBuffers();
- try {
- file = ctx.getJobletContext().createManagedWorkspaceFile(ExternalSortRunGenerator.class.getSimpleName());
- RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
- writer.open();
- try {
- flushCount += 1;
- flushFrames(writer);
- } finally {
- writer.close();
- }
- runs.add(writer.createReader());
- } catch (HyracksDataException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- //frameSorter.AddRuns((RunFileWriter) writer);
+ DumpAllBuffers();
+ try {
+ file = ctx.getJobletContext().createManagedWorkspaceFile(
+ ExternalSortRunGenerator.class.getSimpleName());
+ RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ try {
+ flushCount += 1;
+ flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ runs.add(writer.createReader());
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ // frameSorter.AddRuns((RunFileWriter) writer);
- }
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 18b09b9..5e00d7e 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -7,7 +7,6 @@
import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
@@ -47,314 +46,370 @@
public class Tester {
- private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());
- public static final String NC1_ID = "nc1";
- public static final String NC2_ID = "nc2";
- public static final String NC3_ID = "nc3";
- public static final String NC4_ID = "nc4";
+ private static final Logger LOGGER = Logger.getLogger(Tester.class
+ .getName());
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+ public static final String NC3_ID = "nc3";
+ public static final String NC4_ID = "nc4";
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static NodeControllerService nc2;
- private static NodeControllerService nc3;
- private static NodeControllerService nc4;
- private static IHyracksClientConnection hcc;
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static NodeControllerService nc3;
+ private static NodeControllerService nc4;
+ private static IHyracksClientConnection hcc;
- //private static final boolean DEBUG = true;
+ // private static final boolean DEBUG = true;
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
- LOGGER.setLevel(Level.OFF);
+ LOGGER.setLevel(Level.OFF);
- init();
+ init();
- // Options options = new Options();
+ // Options options = new Options();
- IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
+ IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
- /*
- * JobSpecification job =
- * createJob(parseFileSplits(options.inFileCustomerSplits),
- * parseFileSplits(options.inFileOrderSplits),
- * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
- * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- * options.graceFactor, options.memSize, options.tableSize,
- * options.hasGroupBy);
- */
+ /*
+ * JobSpecification job =
+ * createJob(parseFileSplits(options.inFileCustomerSplits),
+ * parseFileSplits(options.inFileOrderSplits),
+ * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
+ * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+ * options.graceFactor, options.memSize, options.tableSize,
+ * options.hasGroupBy);
+ */
- int k, page_num;
- String file_name = args[0];
- k = Integer.parseInt(args[1]);
- page_num = Integer.parseInt(args[2]);
- int type = Integer.parseInt(args[3]);
+ int k, page_num;
+ String file_name = args[0];
+ k = Integer.parseInt(args[1]);
+ page_num = Integer.parseInt(args[2]);
+ int type = Integer.parseInt(args[3]);
- JobSpecification job = createJob(file_name, k, page_num, type);
+ JobSpecification job = createJob(file_name, k, page_num, type);
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("test", job);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
-
- /*
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob("test", job);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
- String s = "g:\\data\\results.txt" ;
+ /*
+ *
+ * String s = "g:\\data\\results.txt" ;
+ *
+ * filenames = new FileOutputStream(s); // filenames = new
+ * FileInputStream("filename.txt");
+ *
+ * BufferedWriter writer = new BufferedWriter(new
+ * OutputStreamWriter(filenames)); writer.write((int) (end-start));
+ * writer.close();
+ */
- filenames = new FileOutputStream(s);
- // filenames = new FileInputStream("filename.txt");
+ }
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));
- writer.write((int) (end-start));
- writer.close();*/
-
- }
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = -1;
+ File outDir = new File("target/ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(Tester.class.getName(), ".data",
+ outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+ ccConfig.defaultMaxJobAttempts = 0;
- public static void init() throws Exception {
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = -1;
- File outDir = new File("target/ClusterController");
- outDir.mkdirs();
- File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);
- ccRoot.delete();
- ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
- cc = new ClusterControllerService(ccConfig);
- cc.start();
- ccConfig.defaultMaxJobAttempts = 0;
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.ccPort = 39001;
+ ncConfig1.clusterNetIPAddress = "127.0.0.1";
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- nc2 = new NodeControllerService(ncConfig2);
- nc2.start();
-
- NCConfig ncConfig3 = new NCConfig();
- ncConfig3.ccHost = "localhost";
- ncConfig3.ccPort = 39001;
- ncConfig3.clusterNetIPAddress = "127.0.0.1";
- ncConfig3.dataIPAddress = "127.0.0.1";
- ncConfig3.nodeId = NC3_ID;
- nc3 = new NodeControllerService(ncConfig3);
- nc3.start();
-
- NCConfig ncConfig4 = new NCConfig();
- ncConfig4.ccHost = "localhost";
- ncConfig4.ccPort = 39001;
- ncConfig4.clusterNetIPAddress = "127.0.0.1";
- ncConfig4.dataIPAddress = "127.0.0.1";
- ncConfig4.nodeId = NC4_ID;
- nc4 = new NodeControllerService(ncConfig4);
- nc4.start();
+ NCConfig ncConfig3 = new NCConfig();
+ ncConfig3.ccHost = "localhost";
+ ncConfig3.ccPort = 39001;
+ ncConfig3.clusterNetIPAddress = "127.0.0.1";
+ ncConfig3.dataIPAddress = "127.0.0.1";
+ ncConfig3.nodeId = NC3_ID;
+ nc3 = new NodeControllerService(ncConfig3);
+ nc3.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
- hcc.createApplication("test", null);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
- }
- }
+ NCConfig ncConfig4 = new NCConfig();
+ ncConfig4.ccHost = "localhost";
+ ncConfig4.ccPort = 39001;
+ ncConfig4.clusterNetIPAddress = "127.0.0.1";
+ ncConfig4.dataIPAddress = "127.0.0.1";
+ ncConfig4.nodeId = NC4_ID;
+ nc4 = new NodeControllerService(ncConfig4);
+ nc4.start();
- private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
+ ccConfig.clientNetPort);
+ hcc.createApplication("test", null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
//spec.setFrameSize(32768);
spec.setFrameSize(32768);
- FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);
+ // spec.setFrameSize(32768);
+ spec.setFrameSize(64);
- RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});
- //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,
- //ByteSerializerDeserializer.INSTANCE });
+ FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ // NC1_ID);
int[] keyFields = new int[] { 0 };
int frameLimits = 4096; // hyracks oriented
//int tableSize = 10485767; // hyracks oriented
int tableSize = 2351137; // hyracks oriented
- AbstractOperatorDescriptor single_grouper;
- IConnectorDescriptor conn_partition;
- AbstractOperatorDescriptor cross_grouper;
+ int[] keyFields = new int[] { 0 };
+ int frameLimits = 4096; // hyracks oriented
+ int tableSize = 10485767; // hyracks oriented
-
- if(0 == type){//external group by
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ AbstractOperatorDescriptor single_grouper;
+ IConnectorDescriptor conn_partition;
+ AbstractOperatorDescriptor cross_grouper;
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
-
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
+ if (0 == type) {// external group by
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
- }
- else if( 1 == type){
- single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec, new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }), tableSize), true);
- conn_partition = new MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(),
- keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );
- cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(),
- outputRec);
- }
- else{
- long inputSizeInRawRecords = 154000000;
- long inputSizeInUniqueKeys = 38500000;
- int recordSizeInBytes = 4;
- int hashfuncStartLevel = 1;
- single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
- //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- recordSizeInBytes = 13;
- cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,
- frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},
- //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new DistributedMergeLmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(),
- outputRec, true);
- }
-
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(readfileConn, scan, 0, single_grouper, 0);
-
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ cross_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
- //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
- //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
+ } else if (1 == type) {
+ single_grouper = new ExternalGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ new DistributedMergeLmerAggregateFactory(),
+ // new IntSumFieldAggregatorFactory(1, false) }),
+ outputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(VLongPointable.FACTORY) }),
+ tableSize), true);
+ conn_partition = new MToNPartitioningMergingConnectorDescriptor(
+ spec,
+ new KmerHashPartitioncomputerFactory(),
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) });
+ cross_grouper = new PreclusteredGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new DistributedMergeLmerAggregateFactory(), outputRec);
+ } else {
+ long inputSizeInRawRecords = 154000000;
+ long inputSizeInUniqueKeys = 38500000;
+ int recordSizeInBytes = 4;
+ int hashfuncStartLevel = 1;
+ single_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
+ // new IBinaryHashFunctionFamily[]
+ // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ hashfuncStartLevel,
+ new VLongNormalizedKeyComputerFactory(),
+ new MergeKmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ conn_partition = new MToNPartitioningConnectorDescriptor(spec,
+ new KmerHashPartitioncomputerFactory());
+ recordSizeInBytes = 13;
+ cross_grouper = new HybridHashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ frameLimits,
+ inputSizeInRawRecords,
+ inputSizeInUniqueKeys,
+ recordSizeInBytes,
+ tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(VLongPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
+ // new IBinaryHashFunctionFamily[]
+ // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
+ hashfuncStartLevel,
+ new VLongNormalizedKeyComputerFactory(),
+ new DistributedMergeLmerAggregateFactory(),
+ new DistributedMergeLmerAggregateFactory(), outputRec, true);
+ }
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(printConn, cross_grouper, 0, printer, 0);
- //spec.connect(readfileConn, scan, 0, printer, 0);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // single_grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
- spec.addRoot(printer);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
+ spec);
+ spec.connect(readfileConn, scan, 0, single_grouper, 0);
- if( 1 == type ){
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- // System.out.println(spec.toString());
- return spec;
- }
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // cross_grouper,NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
+ // PrinterOperatorDescriptor printer = new
+ // PrinterOperatorDescriptor(spec, "G:\\data\\result");
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // printer, NC1_ID);
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(printConn, cross_grouper, 0, printer, 0);
+ // spec.connect(readfileConn, scan, 0, printer, 0);
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
+ spec.addRoot(printer);
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
+ if (1 == type) {
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ // System.out.println(spec.toString());
+ return spec;
+ }
- @Override
- public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
- }
- }
+ static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
- static class JoinComparator implements ITuplePairComparator {
+ private final IBinaryComparatorFactory bFactory;
+ private final int pos0;
+ private final int pos1;
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
+ public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
+ int pos0, int pos1) {
+ this.bFactory = bFactory;
+ this.pos0 = pos0;
+ this.pos1 = pos1;
+ }
- public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
+ @Override
+ public ITuplePairComparator createTuplePairComparator(
+ IHyracksTaskContext ctx) {
+ return new JoinComparator(bFactory.createBinaryComparator(), pos0,
+ pos1);
+ }
+ }
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+ static class JoinComparator implements ITuplePairComparator {
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+ private final IBinaryComparator bComparator;
+ private final int field0;
+ private final int field1;
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
+ public JoinComparator(IBinaryComparator bComparator, int field0,
+ int field1) {
+ this.bComparator = bComparator;
+ this.field0 = field0;
+ this.field1 = field1;
+ }
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0,
+ IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
- .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
+ + fStartOffset0, fLen0, accessor1.getBuffer().array(),
+ fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
index 792d033..c715dbd 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/DistributedMergeLmerAggregateFactory.java
@@ -10,160 +10,180 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
-public class DistributedMergeLmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private static final int max = 255;
+public class DistributedMergeLmerAggregateFactory implements
+ IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private static final int max = 255;
- public DistributedMergeLmerAggregateFactory() {
- }
+ public DistributedMergeLmerAggregateFactory() {
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new IAggregatorDescriptor() {
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields,
+ int[] keyFieldsInPartialResults) throws HyracksDataException {
+ return new IAggregatorDescriptor() {
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return new AggregateState(new Object() {
+ });
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = 0;
- byte count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
- tupleOffset + accessor.getFieldSlotsLength() + fieldStart);
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = 0;
+ byte count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ bitmap |= ByteSerializerDeserializer.getByte(accessor
+ .getBuffer().array(),
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
- tupleOffset = accessor.getTupleStartOffset(tIndex);
- fieldStart = accessor.getFieldStartOffset(tIndex, 2);
- int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ tupleOffset = accessor.getTupleStartOffset(tIndex);
+ fieldStart = accessor.getFieldStartOffset(tIndex, 2);
+ int offset = tupleOffset + fieldStart
+ + accessor.getFieldSlotsLength();
- count += ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ count += ByteSerializerDeserializer.getByte(accessor
+ .getBuffer().array(), offset);
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
- }
+ }
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
-
- byte bitmap = 0;
- byte count = 0;
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
- bitmap |= ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
+ byte bitmap = 0;
+ byte count = 0;
- tupleOffset = accessor.getTupleStartOffset(tIndex);
- fieldStart = accessor.getFieldStartOffset(tIndex, 2);
- offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
- count = ByteSerializerDeserializer.getByte(accessor.getBuffer().array(), offset);
-
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart;
+ bitmap |= ByteSerializerDeserializer.getByte(accessor
+ .getBuffer().array(), offset);
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;
+ tupleOffset = accessor.getTupleStartOffset(tIndex);
+ fieldStart = accessor.getFieldStartOffset(tIndex, 2);
+ offset = tupleOffset + fieldStart
+ + accessor.getFieldSlotsLength();
+ count = ByteSerializerDeserializer.getByte(accessor.getBuffer()
+ .array(), offset);
- byte[] data = stateAccessor.getBuffer().array();
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int statefieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 1);
+ int stateoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + statefieldStart;
- ByteBuffer buf = ByteBuffer.wrap(data);
- bitmap |= buf.getChar(stateoffset);
- buf.position(stateoffset+1);
- count += buf.get();
-
- if(count > max){
- count = (byte) max;
- }
-
- buf.put(stateoffset, bitmap);
- buf.put(stateoffset + 1, count);
- }
+ byte[] data = stateAccessor.getBuffer().array();
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
- byte bitmap;
- byte count;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- byte[] data = accessor.getBuffer().array();
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ bitmap |= buf.getChar(stateoffset);
+ buf.position(stateoffset + 1);
+ count += buf.get();
- int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
- bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ if (count > max) {
+ count = (byte) max;
+ }
- count = ByteSerializerDeserializer.getByte(data, offset + 1);
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
+ buf.put(stateoffset, bitmap);
+ buf.put(stateoffset + 1, count);
+ }
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ byte count;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
- byte bitmap;
- byte count;
+ int offset = fieldOffset + accessor.getFieldSlotsLength()
+ + tupleOffset;
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
- byte[] data = accessor.getBuffer().array();
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
- int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
- bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = ByteSerializerDeserializer.getByte(data, offset + 1);
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
- }
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ byte count;
- };
- }
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength()
+ + fieldOffset;
+
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
index 1182cb1..b096887 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -10,153 +10,166 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
- private static final long serialVersionUID = 1L;
- private static final int max = 255;
+ private static final long serialVersionUID = 1L;
+ private static final int max = 255;
- public MergeKmerAggregateFactory() {
- }
+ public MergeKmerAggregateFactory() {
+ }
- @Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
- throws HyracksDataException {
- return new IAggregatorDescriptor() {
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
+ RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields,
+ int[] keyFieldsInPartialResults) throws HyracksDataException {
+ return new IAggregatorDescriptor() {
- @Override
- public void reset() {
- }
+ @Override
+ public void reset() {
+ }
- @Override
- public void close() {
- // TODO Auto-generated method stub
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
- }
+ }
- @Override
- public AggregateState createAggregateStates() {
- // TODO Auto-generated method stub
- return new AggregateState(new Object() {
- });
- }
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return new AggregateState(new Object() {
+ });
+ }
- @Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- byte bitmap = 0;
- byte count = 0;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-
- bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
-
- count += 1;
-
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when initializing the aggregator.");
- }
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ byte bitmap = 0;
+ byte count = 0;
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- }
+ bitmap |= accessor.getBuffer().get(
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
- @Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
- byte bitmap = 0;
- byte count = 0;
+ count += 1;
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
-
- bitmap |= accessor.getBuffer().get(tupleOffset + accessor.getFieldSlotsLength()
- + fieldStart);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when initializing the aggregator.");
+ }
- int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
- int statefieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
- int stateoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + statefieldStart;
-
-
- count += 1;
+ }
- byte[] data = stateAccessor.getBuffer().array();
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex,
+ IFrameTupleAccessor stateAccessor, int stateTupleIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap = 0;
+ byte count = 0;
- ByteBuffer buf = ByteBuffer.wrap(data);
- bitmap |= buf.getChar(stateoffset);
- buf.position(stateoffset+1);
- count += buf.get();
-
- if( count > max){
- count = (byte)max;
- }
-
- buf.put(stateoffset, bitmap);
- buf.put(stateoffset + 1, count);
- }
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
- @Override
- public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
- byte bitmap;
- byte count;
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- byte[] data = accessor.getBuffer().array();
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ bitmap |= accessor.getBuffer().get(
+ tupleOffset + accessor.getFieldSlotsLength()
+ + fieldStart);
- int offset = fieldOffset + accessor.getFieldSlotsLength() + tupleOffset;
- bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = ByteSerializerDeserializer.getByte(data, offset + 1);
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
+ int statetupleOffset = stateAccessor
+ .getTupleStartOffset(stateTupleIndex);
+ int statefieldStart = stateAccessor.getFieldStartOffset(
+ stateTupleIndex, 1);
+ int stateoffset = statetupleOffset
+ + stateAccessor.getFieldSlotsLength() + statefieldStart;
- }
+ count += 1;
- @Override
- public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
- // TODO Auto-generated method stub
- byte bitmap;
- byte count;
+ byte[] data = stateAccessor.getBuffer().array();
- byte[] data = accessor.getBuffer().array();
- int tupleOffset = accessor.getTupleStartOffset(tIndex);
- int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
- int offset = tupleOffset + accessor.getFieldSlotsLength() + fieldOffset;
+ ByteBuffer buf = ByteBuffer.wrap(data);
+ bitmap |= buf.getChar(stateoffset);
+ buf.position(stateoffset + 1);
+ count += buf.get();
- bitmap = ByteSerializerDeserializer.getByte(data, offset);
- count = ByteSerializerDeserializer.getByte(data, offset + 1);
+ if (count > max) {
+ count = (byte) max;
+ }
- DataOutput fieldOutput = tupleBuilder.getDataOutput();
- try {
- fieldOutput.writeByte(bitmap);
- tupleBuilder.addFieldEndOffset();
- fieldOutput.writeByte(count);
- tupleBuilder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
- }
- }
+ buf.put(stateoffset, bitmap);
+ buf.put(stateoffset + 1, count);
+ }
- };
- }
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ byte count;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+
+ int offset = fieldOffset + accessor.getFieldSlotsLength()
+ + tupleOffset;
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder,
+ IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ byte bitmap;
+ byte count;
+
+ byte[] data = accessor.getBuffer().array();
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 1);
+ int offset = tupleOffset + accessor.getFieldSlotsLength()
+ + fieldOffset;
+
+ bitmap = ByteSerializerDeserializer.getByte(data, offset);
+ count = ByteSerializerDeserializer.getByte(data, offset + 1);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeByte(bitmap);
+ tupleBuilder.addFieldEndOffset();
+ fieldOutput.writeByte(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException(
+ "I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index 6d0585e..927cd36 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -1,17 +1,12 @@
package edu.uci.ics.genomix.driver;
-import java.io.IOException;
import java.net.URL;
import java.util.EnumSet;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.kohsuke.args4j.Option;
import edu.uci.ics.genomix.job.GenomixJob;
import edu.uci.ics.genomix.job.JobGen;
@@ -25,109 +20,122 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class Driver {
- public static enum Plan {
- BUILD_DEBRUJIN_GRAPH,
- GRAPH_CLEANNING,
- CONTIGS_GENERATION,
- }
-
- private static final String IS_PROFILING = "genomix.driver.profiling";
- private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
- private static final String applicationName = GenomixJob.JOB_NAME;
- private static final Log LOG = LogFactory.getLog(Driver.class);
- private JobGen jobGen;
- private boolean profiling;
-
- private int numPartitionPerMachine;
-
- private IHyracksClientConnection hcc;
-
- public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException{
- try{
- hcc = new HyracksConnection(ipAddress, port);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- this.numPartitionPerMachine = numPartitionPerMachine;
- }
-
- public void runJob(GenomixJob job) throws HyracksException {
- runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
- }
+ public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH, GRAPH_CLEANNING, CONTIGS_GENERATION,
+ }
- public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
- throws HyracksException {
- /** add hadoop configurations */
- //TODO need to include the hadoophome to the classpath in the way below
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.getConfiguration().addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.getConfiguration().addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.getConfiguration().addResource(hadoopHdfs);
+ private static final String IS_PROFILING = "genomix.driver.profiling";
+ private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+ private static final String applicationName = GenomixJob.JOB_NAME;
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
+ private int numPartitionPerMachine;
- this.profiling = profiling;
- try {
- Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
- switch (planChoice) {
- case BUILD_DEBRUJIN_GRAPH:default:
- jobGen = new JobGenBrujinGraph(job, ncMap, numPartitionPerMachine);
- break;
- case GRAPH_CLEANNING:
- jobGen = new JobGenGraphCleanning(job);
- break;
- case CONTIGS_GENERATION:
- jobGen = new JobGenContigsGeneration(job);
- break;
- }
-
- start = System.currentTimeMillis();
- runCreate(jobGen);
- end = System.currentTimeMillis();
- time = end - start;
- LOG.info("result writing finished " + time + "ms");
- LOG.info("job finished");
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification createJob = jobGen.generateJob();
- execute(createJob);
- } catch (Exception e) {
- throw e;
- }
- }
+ private IHyracksClientConnection hcc;
+ private Scheduler scheduler;
- private void execute(JobSpecification job) throws Exception {
- job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(applicationName, job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
- hcc.waitForCompletion(jobId);
- }
-
- public static void main(String [] args) throws Exception{
- GenomixJob job = new GenomixJob();
- String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
- if ( otherArgs.length < 2){
- System.err.println("Need <serverIP> <port>");
- System.exit(-1);
- }
- String ipAddress = otherArgs[0];
- int port = Integer.parseInt(otherArgs[1]);
- int numOfDuplicate = job.getConfiguration().getInt(CPARTITION_PER_MACHINE, 2);
- boolean bProfiling = job.getConfiguration().getBoolean(IS_PROFILING, true);
-
- Driver driver = new Driver(ipAddress, port, numOfDuplicate);
- driver.runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
- }
+ public Driver(String ipAddress, int port, int numPartitionPerMachine)
+ throws HyracksException {
+ try {
+ hcc = new HyracksConnection(ipAddress, port);
+ scheduler = new Scheduler(ipAddress, port);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
+
+ public void runJob(GenomixJob job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
+
+ public void runJob(GenomixJob job, Plan planChoice, boolean profiling)
+ throws HyracksException {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader()
+ .getResource("core-site.xml");
+ job.getConfiguration().addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader()
+ .getResource("mapred-site.xml");
+ job.getConfiguration().addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader()
+ .getResource("hdfs-site.xml");
+ job.getConfiguration().addResource(hadoopHdfs);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc
+ .getNodeControllerInfos();
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:
+ default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
+ numPartitionPerMachine);
+ break;
+ case GRAPH_CLEANNING:
+ jobGen = new JobGenGraphCleanning(job);
+ break;
+ case CONTIGS_GENERATION:
+ jobGen = new JobGenContigsGeneration(job);
+ break;
+ }
+
+ start = System.currentTimeMillis();
+ runCreate(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc.startJob(
+ applicationName,
+ job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+ .noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void main(String[] args) throws Exception {
+ GenomixJob job = new GenomixJob();
+ String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
+ args).getRemainingArgs();
+ if (otherArgs.length < 2) {
+ System.err.println("Need <serverIP> <port>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = job.getConfiguration().getInt(
+ CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = job.getConfiguration().getBoolean(IS_PROFILING,
+ true);
+
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
index 67527a7..bdc8202 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/GenomixJob.java
@@ -6,7 +6,7 @@
import org.apache.hadoop.mapreduce.Job;
public class GenomixJob extends Job {
-
+
public static final String JOB_NAME = "genomix";
/** Kmers length */
@@ -19,7 +19,6 @@
public static final String TABLE_SIZE = "genomix.tablesize";
/** Groupby types ? */
public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
-
/** Configurations used by hybrid groupby function in graph build phrase */
public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
index 66bf79d..6933541 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGen.java
@@ -4,11 +4,11 @@
import org.apache.hadoop.conf.Configuration;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
public abstract class JobGen {
-
+
protected final Configuration conf;
protected final GenomixJob genomixJob;
protected String jobId = new UUID(System.currentTimeMillis(),
@@ -22,6 +22,6 @@
protected abstract void initJobConfiguration();
- public abstract JobSpecification generateJob () throws HyracksDataException;
+ public abstract JobSpecification generateJob() throws HyracksException;
}
\ No newline at end of file
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index 067ad37..a1b28f7 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -3,6 +3,8 @@
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -11,8 +13,9 @@
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;
import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.genomix.dataflow.FileScanDescriptor;
+import edu.uci.ics.genomix.dataflow.KMerWriterFactory;
import edu.uci.ics.genomix.dataflow.PrinterOperatorDescriptor;
+import edu.uci.ics.genomix.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -24,11 +27,11 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
@@ -39,6 +42,9 @@
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
public class JobGenBrujinGraph extends JobGen {
public enum GroupbyType {
@@ -46,8 +52,9 @@
}
private final Map<String, NodeControllerInfo> ncMap;
- private String [] ncNodeNames;
-
+ private Scheduler scheduler;
+ private String[] ncNodeNames;
+
private int kmers;
private int frameLimits;
private int tableSize;
@@ -60,20 +67,24 @@
private AbstractOperatorDescriptor crossGrouper;
private RecordDescriptor outputRec;
- public JobGenBrujinGraph(GenomixJob job,
- final Map<String, NodeControllerInfo> ncMap, int numPartitionPerMachine) {
+ public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler,
+ final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) {
super(job);
this.ncMap = ncMap;
- String [] nodes = new String[ncMap.size()];
+ this.scheduler = scheduler;
+ String[] nodes = new String[ncMap.size()];
ncMap.keySet().toArray(nodes);
ncNodeNames = new String[nodes.length * numPartitionPerMachine];
- for (int i = 0; i < numPartitionPerMachine; i++){
- System.arraycopy(nodes, 0, ncNodeNames, i*nodes.length, nodes.length);
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length,
+ nodes.length);
}
}
private ExternalGroupOperatorDescriptor newExternalGroupby(
- JobSpecification jobSpec, int[] keyFields, IAggregatorDescriptorFactory aggeragater) {
+ JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
return new ExternalGroupOperatorDescriptor(
jobSpec,
keyFields,
@@ -120,19 +131,18 @@
throws HyracksDataException {
int[] keyFields = new int[] { 0 }; // the id of grouped key
- outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
- Integer64SerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE });
switch (groupbyType) {
case EXTERNAL:
- singleGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ singleGrouper = newExternalGroupby(jobSpec, keyFields,
+ new MergeKmerAggregateFactory());
connPartition = new MToNPartitioningConnectorDescriptor(jobSpec,
new KmerHashPartitioncomputerFactory());
- crossGrouper = newExternalGroupby(jobSpec, keyFields,new DistributedMergeLmerAggregateFactory());
+ crossGrouper = newExternalGroupby(jobSpec, keyFields,
+ new DistributedMergeLmerAggregateFactory());
break;
case PRECLUSTER:
- singleGrouper = newExternalGroupby(jobSpec, keyFields,new MergeKmerAggregateFactory());
+ singleGrouper = newExternalGroupby(jobSpec, keyFields,
+ new MergeKmerAggregateFactory());
connPartition = new MToNPartitioningMergingConnectorDescriptor(
jobSpec,
new KmerHashPartitioncomputerFactory(),
@@ -172,34 +182,58 @@
}
}
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec)
+ throws HyracksDataException {
+ try {
+ outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE });
+
+ InputSplit[] splits = ((JobConf) conf).getInputFormat().getSplits(
+ (JobConf) conf, ncNodeNames.length);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, outputRec,
+ (JobConf) conf, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmers));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
@Override
- public JobSpecification generateJob() throws HyracksDataException {
-
+ public JobSpecification generateJob() throws HyracksException {
+
JobSpecification jobSpec = new JobSpecification();
- //File input
- FileScanDescriptor scan = new FileScanDescriptor(jobSpec, kmers, inputPaths);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, scan,ncNodeNames);
-
+ // File input
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ readOperator, ncNodeNames);
+
generateDescriptorbyType(jobSpec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
singleGrouper, ncNodeNames);
IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
jobSpec);
- jobSpec.connect(readfileConn, scan, 0, singleGrouper, 0);
+ jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
crossGrouper, ncNodeNames);
jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
- //Output
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(jobSpec,
- outputPath.getName());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, printer,
- ncNodeNames);
+ // Output
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(
+ jobSpec, outputPath.getName());
+ HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
+ jobSpec, (JobConf) conf, new KMerWriterFactory());
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ printer, ncNodeNames);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(
+ jobSpec);
jobSpec.connect(printConn, crossGrouper, 0, printer, 0);
jobSpec.addRoot(printer);
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
index d221834..6d30fad 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -17,7 +17,7 @@
@Override
protected void initJobConfiguration() {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
index 6b7d98e..43b5a97 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -17,7 +17,7 @@
@Override
protected void initJobConfiguration() {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/genomix/genomix-core/src/main/resources/conf/cluster.properties b/genomix/genomix-core/src/main/resources/conf/cluster.properties
index 5b2b757..77abcf5 100644
--- a/genomix/genomix-core/src/main/resources/conf/cluster.properties
+++ b/genomix/genomix-core/src/main/resources/conf/cluster.properties
@@ -25,6 +25,9 @@
#The JAVA_HOME
JAVA_HOME=$JAVA_HOME
+#HADOOP_HOME
+CLASSPATH="${HADOOP_HOME}:${CLASSPATH}:."
+
#The frame size of the internal dataflow engine
FRAME_SIZE=65536
diff --git a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
index 5de25ba..8aad1b9 100644
--- a/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
+++ b/genomix/genomix-core/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTestCase.java
@@ -5,6 +5,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
@@ -12,102 +13,201 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.junit.Test;
+import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.genomix.driver.Driver;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
+import edu.uci.ics.hyracks.hdfs.lib.RawBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.lib.TextTupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
+import edu.uci.ics.hyracks.hdfs.utils.TestUtils;
-public class JobRunTestCase extends TestCase{
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+public class JobRunTestCase extends TestCase {
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String EXPECTED_RESULT_PATH = "src/test/resources/expected";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
- private static final String HDFS_INPUT_PATH = "/customer/";
- private static final String HDFS_OUTPUT_PATH = "/customer_result/";
+ private static final String DATA_PATH = "src/test/resources/data/customer.tbl";
+ private static final String HDFS_INPUT_PATH = "/customer/";
+ private static final String HDFS_OUTPUT_PATH = "/customer_result/";
- private static final String HYRACKS_APP_NAME = "genomix";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String HYRACKS_APP_NAME = "genomix";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
- private int numPartitionPerMachine=2;
-
- private Driver myDriver;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
+ private int numPartitionPerMachine = 2;
+
+ private Driver myDriver;
+
@Override
protected void setUp() throws Exception {
- cleanupStores();
- HyracksUtils.init();
- HyracksUtils.createApp(HYRACKS_APP_NAME);
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
-
- myDriver = new Driver(HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT, numPartitionPerMachine);
- }
+ cleanupStores();
+ HyracksUtils.init();
+ HyracksUtils.createApp(HYRACKS_APP_NAME);
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ myDriver = new Driver(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT,
+ numPartitionPerMachine);
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_INPUT_PATH);
- Path result = new Path(HDFS_OUTPUT_PATH);
- dfs.mkdirs(dest);
- dfs.mkdirs(result);
- dfs.copyFromLocalFile(src, dest);
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_INPUT_PATH);
+ Path result = new Path(HDFS_OUTPUT_PATH);
+ dfs.mkdirs(dest);
+ dfs.mkdirs(result);
+ dfs.copyFromLocalFile(src, dest);
-
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
@Override
protected void runTest() throws Throwable {
TestExternalGroupby();
TestPreClusterGroupby();
TestHybridGroupby();
}
-
- void TestExternalGroupby() throws Exception{
- // TODO
+
+ public void runHdfsJob() throws Throwable {
+
+ FileInputFormat.setInputPaths(conf, HDFS_INPUT_PATH);
+ FileOutputFormat.setOutputPath(conf, new Path(HDFS_OUTPUT_PATH));
+ conf.setInputFormat(TextInputFormat.class);
+
+ Scheduler scheduler = new Scheduler(HyracksUtils.CC_HOST,
+ HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ InputSplit[] splits = conf.getInputFormat().getSplits(conf,
+ numberOfNC * 4);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ JobSpecification jobSpec = new JobSpecification();
+ RecordDescriptor recordDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+ String[] locations = new String[] { HyracksUtils.NC1_ID,
+ HyracksUtils.NC1_ID, HyracksUtils.NC2_ID, HyracksUtils.NC2_ID };
+ HDFSReadOperatorDescriptor readOperator = new HDFSReadOperatorDescriptor(
+ jobSpec, recordDesc, conf, splits, readSchedule,
+ new TextKeyValueParserFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ readOperator, locations);
+
+ ExternalSortOperatorDescriptor sortOperator = new ExternalSortOperatorDescriptor(
+ jobSpec,
+ 10,
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE },
+ recordDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ sortOperator, locations);
+
+ HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(
+ jobSpec, conf, new TextTupleWriterFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
+ writeOperator, HyracksUtils.NC1_ID);
+
+ jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), readOperator,
+ 0, sortOperator, 0);
+ jobSpec.connect(
+ new MToNPartitioningMergingConnectorDescriptor(
+ jobSpec,
+ new FieldHashPartitionComputerFactory(
+ new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { RawBinaryHashFunctionFactory.INSTANCE }),
+ new int[] { 0 },
+ new IBinaryComparatorFactory[] { RawBinaryComparatorFactory.INSTANCE }),
+ sortOperator, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
+
+ IHyracksClientConnection client = new HyracksConnection(
+ HyracksUtils.CC_HOST, HyracksUtils.TEST_HYRACKS_CC_CLIENT_PORT);
+ JobId jobId = client.startJob(HYRACKS_APP_NAME, jobSpec);
+ client.waitForCompletion(jobId);
+
+ Assert.assertEquals(true, checkResults());
}
-
- void TestPreClusterGroupby() throws Exception{
- // TODO
+
+ void TestExternalGroupby() throws Exception {
}
-
- void TestHybridGroupby() throws Exception{
+
+ void TestPreClusterGroupby() throws Exception {
// TODO
}
-
+ void TestHybridGroupby() throws Exception {
+ // TODO
+ }
+
+ private boolean checkResults() throws Exception {
+ FileSystem dfs = FileSystem.get(conf);
+ Path result = new Path(HDFS_OUTPUT_PATH);
+ Path actual = new Path(ACTUAL_RESULT_DIR);
+ dfs.copyToLocalFile(result, actual);
+
+ TestUtils.compareWithResult(new File(EXPECTED_RESULT_PATH
+ + File.separator + "part-0"), new File(ACTUAL_RESULT_DIR
+ + File.separator + "customer_result" + File.separator
+ + "part-0"));
+ return true;
+ }
+
@Override
protected void tearDown() throws Exception {
- HyracksUtils.destroyApp(HYRACKS_APP_NAME);
- HyracksUtils.deinit();
- cleanupHDFS();
+ HyracksUtils.destroyApp(HYRACKS_APP_NAME);
+ HyracksUtils.deinit();
+ cleanupHDFS();
}
-
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
-
+
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
+
}
diff --git a/genomix/genomix-core/src/test/resources/data/customer.tbl b/genomix/genomix-core/src/test/resources/data/customer.tbl
new file mode 100644
index 0000000..5d39c80
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/data/customer.tbl
@@ -0,0 +1,150 @@
+1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e|
+2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref|
+3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov|
+4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou|
+5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor|
+6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious|
+7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|18|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp|
+8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|17|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide|
+9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|8|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl|
+10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|5|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur|
+11|Customer#000000011|PkWS 3HlXqwTuzrKg633BEi|23|33-464-151-3439|-272.60|BUILDING|ckages. requests sleep slyly. quickly even pinto beans promise above the slyly regular pinto beans. |
+12|Customer#000000012|9PWKuhzT4Zr1Q|13|23-791-276-1263|3396.49|HOUSEHOLD| to the carefully final braids. blithely regular requests nag. ironic theodolites boost quickly along|
+13|Customer#000000013|nsXQu0oVjD7PM659uC3SRSp|3|13-761-547-5974|3857.34|BUILDING|ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely|
+14|Customer#000000014|KXkletMlL2JQEA |1|11-845-129-3851|5266.30|FURNITURE|, ironic packages across the unus|
+15|Customer#000000015|YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn|23|33-687-542-7601|2788.52|HOUSEHOLD| platelets. regular deposits detect asymptotes. blithely unusual packages nag slyly at the fluf|
+16|Customer#000000016|cYiaeMLZSMAOQ2 d0W,|10|20-781-609-3107|4681.03|FURNITURE|kly silent courts. thinly regular theodolites sleep fluffily after |
+17|Customer#000000017|izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7|2|12-970-682-3487|6.34|AUTOMOBILE|packages wake! blithely even pint|
+18|Customer#000000018|3txGO AiuFux3zT0Z9NYaFRnZt|6|16-155-215-1315|5494.43|BUILDING|s sleep. carefully even instructions nag furiously alongside of t|
+19|Customer#000000019|uc,3bHIx84H,wdrmLOjVsiqXCq2tr|18|28-396-526-5053|8914.71|HOUSEHOLD| nag. furiously careful packages are slyly at the accounts. furiously regular in|
+20|Customer#000000020|JrPk8Pqplj4Ne|22|32-957-234-8742|7603.40|FURNITURE|g alongside of the special excuses-- fluffily enticing packages wake |
+21|Customer#000000021|XYmVpr9yAHDEn|8|18-902-614-8344|1428.25|MACHINERY| quickly final accounts integrate blithely furiously u|
+22|Customer#000000022|QI6p41,FNs5k7RZoCCVPUTkUdYpB|3|13-806-545-9701|591.98|MACHINERY|s nod furiously above the furiously ironic ideas. |
+23|Customer#000000023|OdY W13N7Be3OC5MpgfmcYss0Wn6TKT|3|13-312-472-8245|3332.02|HOUSEHOLD|deposits. special deposits cajole slyly. fluffily special deposits about the furiously |
+24|Customer#000000024|HXAFgIAyjxtdqwimt13Y3OZO 4xeLe7U8PqG|13|23-127-851-8031|9255.67|MACHINERY|into beans. fluffily final ideas haggle fluffily|
+25|Customer#000000025|Hp8GyFQgGHFYSilH5tBfe|12|22-603-468-3533|7133.70|FURNITURE|y. accounts sleep ruthlessly according to the regular theodolites. unusual instructions sleep. ironic, final|
+26|Customer#000000026|8ljrc5ZeMl7UciP|22|32-363-455-4837|5182.05|AUTOMOBILE|c requests use furiously ironic requests. slyly ironic dependencies us|
+27|Customer#000000027|IS8GIyxpBrLpMT0u7|3|13-137-193-2709|5679.84|BUILDING| about the carefully ironic pinto beans. accoun|
+28|Customer#000000028|iVyg0daQ,Tha8x2WPWA9m2529m|8|18-774-241-1462|1007.18|FURNITURE| along the regular deposits. furiously final pac|
+29|Customer#000000029|sJ5adtfyAkCK63df2,vF25zyQMVYE34uh|0|10-773-203-7342|7618.27|FURNITURE|its after the carefully final platelets x-ray against |
+30|Customer#000000030|nJDsELGAavU63Jl0c5NKsKfL8rIJQQkQnYL2QJY|1|11-764-165-5076|9321.01|BUILDING|lithely final requests. furiously unusual account|
+31|Customer#000000031|LUACbO0viaAv6eXOAebryDB xjVst|23|33-197-837-7094|5236.89|HOUSEHOLD|s use among the blithely pending depo|
+32|Customer#000000032|jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J|15|25-430-914-2194|3471.53|BUILDING|cial ideas. final, furious requests across the e|
+33|Customer#000000033|qFSlMuLucBmx9xnn5ib2csWUweg D|17|27-375-391-1280|-78.56|AUTOMOBILE|s. slyly regular accounts are furiously. carefully pending requests|
+34|Customer#000000034|Q6G9wZ6dnczmtOx509xgE,M2KV|15|25-344-968-5422|8589.70|HOUSEHOLD|nder against the even, pending accounts. even|
+35|Customer#000000035|TEjWGE4nBzJL2|17|27-566-888-7431|1228.24|HOUSEHOLD|requests. special, express requests nag slyly furiousl|
+36|Customer#000000036|3TvCzjuPzpJ0,DdJ8kW5U|21|31-704-669-5769|4987.27|BUILDING|haggle. enticing, quiet platelets grow quickly bold sheaves. carefully regular acc|
+37|Customer#000000037|7EV4Pwh,3SboctTWt|8|18-385-235-7162|-917.75|FURNITURE|ilent packages are carefully among the deposits. furiousl|
+38|Customer#000000038|a5Ee5e9568R8RLP 2ap7|12|22-306-880-7212|6345.11|HOUSEHOLD|lar excuses. closely even asymptotes cajole blithely excuses. carefully silent pinto beans sleep carefully fin|
+39|Customer#000000039|nnbRg,Pvy33dfkorYE FdeZ60|2|12-387-467-6509|6264.31|AUTOMOBILE|tions. slyly silent excuses slee|
+40|Customer#000000040|gOnGWAyhSV1ofv|3|13-652-915-8939|1335.30|BUILDING|rges impress after the slyly ironic courts. foxes are. blithely |
+41|Customer#000000041|IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF|10|20-917-711-4011|270.95|HOUSEHOLD|ly regular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo|
+42|Customer#000000042|ziSrvyyBke|5|15-416-330-4175|8727.01|BUILDING|ssly according to the pinto beans: carefully special requests across the even, pending accounts wake special|
+43|Customer#000000043|ouSbjHk8lh5fKX3zGso3ZSIj9Aa3PoaFd|19|29-316-665-2897|9904.28|MACHINERY|ial requests: carefully pending foxes detect quickly. carefully final courts cajole quickly. carefully|
+44|Customer#000000044|Oi,dOSPwDu4jo4x,,P85E0dmhZGvNtBwi|16|26-190-260-5375|7315.94|AUTOMOBILE|r requests around the unusual, bold a|
+45|Customer#000000045|4v3OcpFgoOmMG,CbnF,4mdC|9|19-715-298-9917|9983.38|AUTOMOBILE|nto beans haggle slyly alongside of t|
+46|Customer#000000046|eaTXWWm10L9|6|16-357-681-2007|5744.59|AUTOMOBILE|ctions. accounts sleep furiously even requests. regular, regular accounts cajole blithely around the final pa|
+47|Customer#000000047|b0UgocSqEW5 gdVbhNT|2|12-427-271-9466|274.58|BUILDING|ions. express, ironic instructions sleep furiously ironic ideas. furi|
+48|Customer#000000048|0UU iPhBupFvemNB|0|10-508-348-5882|3792.50|BUILDING|re fluffily pending foxes. pending, bold platelets sleep slyly. even platelets cajo|
+49|Customer#000000049|cNgAeX7Fqrdf7HQN9EwjUa4nxT,68L FKAxzl|10|20-908-631-4424|4573.94|FURNITURE|nusual foxes! fluffily pending packages maintain to the regular |
+50|Customer#000000050|9SzDYlkzxByyJ1QeTI o|6|16-658-112-3221|4266.13|MACHINERY|ts. furiously ironic accounts cajole furiously slyly ironic dinos.|
+51|Customer#000000051|uR,wEaiTvo4|12|22-344-885-4251|855.87|FURNITURE|eposits. furiously regular requests integrate carefully packages. furious|
+52|Customer#000000052|7 QOqGqqSy9jfV51BC71jcHJSD0|11|21-186-284-5998|5630.28|HOUSEHOLD|ic platelets use evenly even accounts. stealthy theodolites cajole furiou|
+53|Customer#000000053|HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib|15|25-168-852-5363|4113.64|HOUSEHOLD|ar accounts are. even foxes are blithely. fluffily pending deposits boost|
+54|Customer#000000054|,k4vf 5vECGWFy,hosTE,|4|14-776-370-4745|868.90|AUTOMOBILE|sual, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi|
+55|Customer#000000055|zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom|10|20-180-440-8525|4572.11|MACHINERY|ully unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab|
+56|Customer#000000056|BJYZYJQk4yD5B|10|20-895-685-6920|6530.86|FURNITURE|. notornis wake carefully. carefully fluffy requests are furiously even accounts. slyly expre|
+57|Customer#000000057|97XYbsuOPRXPWU|21|31-835-306-1650|4151.93|AUTOMOBILE|ove the carefully special packages. even, unusual deposits sleep slyly pend|
+58|Customer#000000058|g9ap7Dk1Sv9fcXEWjpMYpBZIRUohi T|13|23-244-493-2508|6478.46|HOUSEHOLD|ideas. ironic ideas affix furiously express, final instructions. regular excuses use quickly e|
+59|Customer#000000059|zLOCP0wh92OtBihgspOGl4|1|11-355-584-3112|3458.60|MACHINERY|ously final packages haggle blithely after the express deposits. furiou|
+60|Customer#000000060|FyodhjwMChsZmUz7Jz0H|12|22-480-575-5866|2741.87|MACHINERY|latelets. blithely unusual courts boost furiously about the packages. blithely final instruct|
+61|Customer#000000061|9kndve4EAJxhg3veF BfXr7AqOsT39o gtqjaYE|17|27-626-559-8599|1536.24|FURNITURE|egular packages shall have to impress along the |
+62|Customer#000000062|upJK2Dnw13,|7|17-361-978-7059|595.61|MACHINERY|kly special dolphins. pinto beans are slyly. quickly regular accounts are furiously a|
+63|Customer#000000063|IXRSpVWWZraKII|21|31-952-552-9584|9331.13|AUTOMOBILE|ithely even accounts detect slyly above the fluffily ir|
+64|Customer#000000064|MbCeGY20kaKK3oalJD,OT|3|13-558-731-7204|-646.64|BUILDING|structions after the quietly ironic theodolites cajole be|
+65|Customer#000000065|RGT yzQ0y4l0H90P783LG4U95bXQFDRXbWa1sl,X|23|33-733-623-5267|8795.16|AUTOMOBILE|y final foxes serve carefully. theodolites are carefully. pending i|
+66|Customer#000000066|XbsEqXH1ETbJYYtA1A|22|32-213-373-5094|242.77|HOUSEHOLD|le slyly accounts. carefully silent packages benea|
+67|Customer#000000067|rfG0cOgtr5W8 xILkwp9fpCS8|9|19-403-114-4356|8166.59|MACHINERY|indle furiously final, even theodo|
+68|Customer#000000068|o8AibcCRkXvQFh8hF,7o|12|22-918-832-2411|6853.37|HOUSEHOLD| pending pinto beans impress realms. final dependencies |
+69|Customer#000000069|Ltx17nO9Wwhtdbe9QZVxNgP98V7xW97uvSH1prEw|9|19-225-978-5670|1709.28|HOUSEHOLD|thely final ideas around the quickly final dependencies affix carefully quickly final theodolites. final accounts c|
+70|Customer#000000070|mFowIuhnHjp2GjCiYYavkW kUwOjIaTCQ|22|32-828-107-2832|4867.52|FURNITURE|fter the special asymptotes. ideas after the unusual frets cajole quickly regular pinto be|
+71|Customer#000000071|TlGalgdXWBmMV,6agLyWYDyIz9MKzcY8gl,w6t1B|7|17-710-812-5403|-611.19|HOUSEHOLD|g courts across the regular, final pinto beans are blithely pending ac|
+72|Customer#000000072|putjlmskxE,zs,HqeIA9Wqu7dhgH5BVCwDwHHcf|2|12-759-144-9689|-362.86|FURNITURE|ithely final foxes sleep always quickly bold accounts. final wat|
+73|Customer#000000073|8IhIxreu4Ug6tt5mog4|0|10-473-439-3214|4288.50|BUILDING|usual, unusual packages sleep busily along the furiou|
+74|Customer#000000074|IkJHCA3ZThF7qL7VKcrU nRLl,kylf |4|14-199-862-7209|2764.43|MACHINERY|onic accounts. blithely slow packages would haggle carefully. qui|
+75|Customer#000000075|Dh 6jZ,cwxWLKQfRKkiGrzv6pm|18|28-247-803-9025|6684.10|AUTOMOBILE| instructions cajole even, even deposits. finally bold deposits use above the even pains. slyl|
+76|Customer#000000076|m3sbCvjMOHyaOofH,e UkGPtqc4|0|10-349-718-3044|5745.33|FURNITURE|pecial deposits. ironic ideas boost blithely according to the closely ironic theodolites! furiously final deposits n|
+77|Customer#000000077|4tAE5KdMFGD4byHtXF92vx|17|27-269-357-4674|1738.87|BUILDING|uffily silent requests. carefully ironic asymptotes among the ironic hockey players are carefully bli|
+78|Customer#000000078|HBOta,ZNqpg3U2cSL0kbrftkPwzX|9|19-960-700-9191|7136.97|FURNITURE|ests. blithely bold pinto beans h|
+79|Customer#000000079|n5hH2ftkVRwW8idtD,BmM2|15|25-147-850-4166|5121.28|MACHINERY|es. packages haggle furiously. regular, special requests poach after the quickly express ideas. blithely pending re|
+80|Customer#000000080|K,vtXp8qYB |0|10-267-172-7101|7383.53|FURNITURE|tect among the dependencies. bold accounts engage closely even pinto beans. ca|
+81|Customer#000000081|SH6lPA7JiiNC6dNTrR|20|30-165-277-3269|2023.71|BUILDING|r packages. fluffily ironic requests cajole fluffily. ironically regular theodolit|
+82|Customer#000000082|zhG3EZbap4c992Gj3bK,3Ne,Xn|18|28-159-442-5305|9468.34|AUTOMOBILE|s wake. bravely regular accounts are furiously. regula|
+83|Customer#000000083|HnhTNB5xpnSF20JBH4Ycs6psVnkC3RDf|22|32-817-154-4122|6463.51|BUILDING|ccording to the quickly bold warhorses. final, regular foxes integrate carefully. bold packages nag blithely ev|
+84|Customer#000000084|lpXz6Fwr9945rnbtMc8PlueilS1WmASr CB|11|21-546-818-3802|5174.71|FURNITURE|ly blithe foxes. special asymptotes haggle blithely against the furiously regular depo|
+85|Customer#000000085|siRerlDwiolhYR 8FgksoezycLj|5|15-745-585-8219|3386.64|FURNITURE|ronic ideas use above the slowly pendin|
+86|Customer#000000086|US6EGGHXbTTXPL9SBsxQJsuvy|0|10-677-951-2353|3306.32|HOUSEHOLD|quests. pending dugouts are carefully aroun|
+87|Customer#000000087|hgGhHVSWQl 6jZ6Ev|23|33-869-884-7053|6327.54|FURNITURE|hely ironic requests integrate according to the ironic accounts. slyly regular pla|
+88|Customer#000000088|wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg|16|26-516-273-2566|8031.44|AUTOMOBILE|s are quickly above the quickly ironic instructions; even requests about the carefully final deposi|
+89|Customer#000000089|dtR, y9JQWUO6FoJExyp8whOU|14|24-394-451-5404|1530.76|FURNITURE|counts are slyly beyond the slyly final accounts. quickly final ideas wake. r|
+90|Customer#000000090|QxCzH7VxxYUWwfL7|16|26-603-491-1238|7354.23|BUILDING|sly across the furiously even |
+91|Customer#000000091|S8OMYFrpHwoNHaGBeuS6E 6zhHGZiprw1b7 q|8|18-239-400-3677|4643.14|AUTOMOBILE|onic accounts. fluffily silent pinto beans boost blithely according to the fluffily exp|
+92|Customer#000000092|obP PULk2LH LqNF,K9hcbNqnLAkJVsl5xqSrY,|2|12-446-416-8471|1182.91|MACHINERY|. pinto beans hang slyly final deposits. ac|
+93|Customer#000000093|EHXBr2QGdh|7|17-359-388-5266|2182.52|MACHINERY|press deposits. carefully regular platelets r|
+94|Customer#000000094|IfVNIN9KtkScJ9dUjK3Pg5gY1aFeaXewwf|9|19-953-499-8833|5500.11|HOUSEHOLD|latelets across the bold, final requests sleep according to the fluffily bold accounts. unusual deposits amon|
+95|Customer#000000095|EU0xvmWvOmUUn5J,2z85DQyG7QCJ9Xq7|15|25-923-255-2929|5327.38|MACHINERY|ithely. ruthlessly final requests wake slyly alongside of the furiously silent pinto beans. even the|
+96|Customer#000000096|vWLOrmXhRR|8|18-422-845-1202|6323.92|AUTOMOBILE|press requests believe furiously. carefully final instructions snooze carefully. |
+97|Customer#000000097|OApyejbhJG,0Iw3j rd1M|17|27-588-919-5638|2164.48|AUTOMOBILE|haggle slyly. bold, special ideas are blithely above the thinly bold theo|
+98|Customer#000000098|7yiheXNSpuEAwbswDW|12|22-885-845-6889|-551.37|BUILDING|ages. furiously pending accounts are quickly carefully final foxes: busily pe|
+99|Customer#000000099|szsrOiPtCHVS97Lt|15|25-515-237-9232|4088.65|HOUSEHOLD|cajole slyly about the regular theodolites! furiously bold requests nag along the pending, regular packages. somas|
+100|Customer#000000100|fptUABXcmkC5Wx|20|30-749-445-4907|9889.89|FURNITURE|was furiously fluffily quiet deposits. silent, pending requests boost against |
+101|Customer#000000101|sMmL2rNeHDltovSm Y|2|12-514-298-3699|7470.96|MACHINERY| sleep. pending packages detect slyly ironic pack|
+102|Customer#000000102|UAtflJ06 fn9zBfKjInkQZlWtqaA|19|29-324-978-8538|8462.17|BUILDING|ously regular dependencies nag among the furiously express dinos. blithely final|
+103|Customer#000000103|8KIsQX4LJ7QMsj6DrtFtXu0nUEdV,8a|9|19-216-107-2107|2757.45|BUILDING|furiously pending notornis boost slyly around the blithely ironic ideas? final, even instructions cajole fl|
+104|Customer#000000104|9mcCK L7rt0SwiYtrbO88DiZS7U d7M|10|20-966-284-8065|-588.38|FURNITURE|rate carefully slyly special pla|
+105|Customer#000000105|4iSJe4L SPjg7kJj98Yz3z0B|10|20-793-553-6417|9091.82|MACHINERY|l pains cajole even accounts. quietly final instructi|
+106|Customer#000000106|xGCOEAUjUNG|1|11-751-989-4627|3288.42|MACHINERY|lose slyly. ironic accounts along the evenly regular theodolites wake about the special, final gifts. |
+107|Customer#000000107|Zwg64UZ,q7GRqo3zm7P1tZIRshBDz|15|25-336-529-9919|2514.15|AUTOMOBILE|counts cajole slyly. regular requests wake. furiously regular deposits about the blithely final fo|
+108|Customer#000000108|GPoeEvpKo1|5|15-908-619-7526|2259.38|BUILDING|refully ironic deposits sleep. regular, unusual requests wake slyly|
+109|Customer#000000109|OOOkYBgCMzgMQXUmkocoLb56rfrdWp2NE2c|16|26-992-422-8153|-716.10|BUILDING|es. fluffily final dependencies sleep along the blithely even pinto beans. final deposits haggle furiously furiou|
+110|Customer#000000110|mymPfgphaYXNYtk|10|20-893-536-2069|7462.99|AUTOMOBILE|nto beans cajole around the even, final deposits. quickly bold packages according to the furiously regular dept|
+111|Customer#000000111|CBSbPyOWRorloj2TBvrK9qp9tHBs|22|32-582-283-7528|6505.26|MACHINERY|ly unusual instructions detect fluffily special deposits-- theodolites nag carefully during the ironic dependencies|
+112|Customer#000000112|RcfgG3bO7QeCnfjqJT1|19|29-233-262-8382|2953.35|FURNITURE|rmanently unusual multipliers. blithely ruthless deposits are furiously along the|
+113|Customer#000000113|eaOl5UBXIvdY57rglaIzqvfPD,MYfK|12|22-302-930-4756|2912.00|BUILDING|usly regular theodolites boost furiously doggedly pending instructio|
+114|Customer#000000114|xAt 5f5AlFIU|14|24-805-212-7646|1027.46|FURNITURE|der the carefully express theodolites are after the packages. packages are. bli|
+115|Customer#000000115|0WFt1IXENmUT2BgbsB0ShVKJZt0HCBCbFl0aHc|8|18-971-699-1843|7508.92|HOUSEHOLD|sits haggle above the carefully ironic theodolite|
+116|Customer#000000116|yCuVxIgsZ3,qyK2rloThy3u|16|26-632-309-5792|8403.99|BUILDING|as. quickly final sauternes haggle slyly carefully even packages. brave, ironic pinto beans are above the furious|
+117|Customer#000000117|uNhM,PzsRA3S,5Y Ge5Npuhi|24|34-403-631-3505|3950.83|FURNITURE|affix. instructions are furiously sl|
+118|Customer#000000118|OVnFuHygK9wx3xpg8|18|28-639-943-7051|3582.37|AUTOMOBILE|uick packages alongside of the furiously final deposits haggle above the fluffily even foxes. blithely dogged dep|
+119|Customer#000000119|M1ETOIecuvH8DtM0Y0nryXfW|7|17-697-919-8406|3930.35|FURNITURE|express ideas. blithely ironic foxes thrash. special acco|
+120|Customer#000000120|zBNna00AEInqyO1|12|22-291-534-1571|363.75|MACHINERY| quickly. slyly ironic requests cajole blithely furiously final dependen|
+121|Customer#000000121|tv nCR2YKupGN73mQudO|17|27-411-990-2959|6428.32|BUILDING|uriously stealthy ideas. carefully final courts use carefully|
+122|Customer#000000122|yp5slqoNd26lAENZW3a67wSfXA6hTF|3|13-702-694-4520|7865.46|HOUSEHOLD| the special packages hinder blithely around the permanent requests. bold depos|
+123|Customer#000000123|YsOnaaER8MkvK5cpf4VSlq|5|15-817-151-1168|5897.83|BUILDING|ependencies. regular, ironic requests are fluffily regu|
+124|Customer#000000124|aTbyVAW5tCd,v09O|18|28-183-750-7809|1842.49|AUTOMOBILE|le fluffily even dependencies. quietly s|
+125|Customer#000000125|,wSZXdVR xxIIfm9s8ITyLl3kgjT6UC07GY0Y|19|29-261-996-3120|-234.12|FURNITURE|x-ray finally after the packages? regular requests c|
+126|Customer#000000126|ha4EHmbx3kg DYCsP6DFeUOmavtQlHhcfaqr|22|32-755-914-7592|1001.39|HOUSEHOLD|s about the even instructions boost carefully furiously ironic pearls. ruthless, |
+127|Customer#000000127|Xyge4DX2rXKxXyye1Z47LeLVEYMLf4Bfcj|21|31-101-672-2951|9280.71|MACHINERY|ic, unusual theodolites nod silently after the final, ironic instructions: pending r|
+128|Customer#000000128|AmKUMlJf2NRHcKGmKjLS|4|14-280-874-8044|-986.96|HOUSEHOLD|ing packages integrate across the slyly unusual dugouts. blithely silent ideas sublate carefully. blithely expr|
+129|Customer#000000129|q7m7rbMM0BpaCdmxloCgBDRCleXsXkdD8kf|7|17-415-148-7416|9127.27|HOUSEHOLD| unusual deposits boost carefully furiously silent ideas. pending accounts cajole slyly across|
+130|Customer#000000130|RKPx2OfZy0Vn 8wGWZ7F2EAvmMORl1k8iH|9|19-190-993-9281|5073.58|HOUSEHOLD|ix slowly. express packages along the furiously ironic requests integrate daringly deposits. fur|
+131|Customer#000000131|jyN6lAjb1FtH10rMC,XzlWyCBrg75|11|21-840-210-3572|8595.53|HOUSEHOLD|jole special packages. furiously final dependencies about the furiously speci|
+132|Customer#000000132|QM5YabAsTLp9|4|14-692-150-9717|162.57|HOUSEHOLD|uickly carefully special theodolites. carefully regular requests against the blithely unusual instructions |
+133|Customer#000000133|IMCuXdpIvdkYO92kgDGuyHgojcUs88p|17|27-408-997-8430|2314.67|AUTOMOBILE|t packages. express pinto beans are blithely along the unusual, even theodolites. silent packages use fu|
+134|Customer#000000134|sUiZ78QCkTQPICKpA9OBzkUp2FM|11|21-200-159-5932|4608.90|BUILDING|yly fluffy foxes boost final ideas. b|
+135|Customer#000000135|oZK,oC0 fdEpqUML|19|29-399-293-6241|8732.91|FURNITURE| the slyly final accounts. deposits cajole carefully. carefully sly packag|
+136|Customer#000000136|QoLsJ0v5C1IQbh,DS1|7|17-501-210-4726|-842.39|FURNITURE|ackages sleep ironic, final courts. even requests above the blithely bold requests g|
+137|Customer#000000137|cdW91p92rlAEHgJafqYyxf1Q|16|26-777-409-5654|7838.30|HOUSEHOLD|carefully regular theodolites use. silent dolphins cajo|
+138|Customer#000000138|5uyLAeY7HIGZqtu66Yn08f|5|15-394-860-4589|430.59|MACHINERY|ts doze on the busy ideas. regular|
+139|Customer#000000139|3ElvBwudHKL02732YexGVFVt |9|19-140-352-1403|7897.78|MACHINERY|nstructions. quickly ironic ideas are carefully. bold, |
+140|Customer#000000140|XRqEPiKgcETII,iOLDZp5jA|4|14-273-885-6505|9963.15|MACHINERY|ies detect slyly ironic accounts. slyly ironic theodolites hag|
+141|Customer#000000141|5IW,WROVnikc3l7DwiUDGQNGsLBGOL6Dc0|1|11-936-295-6204|6706.14|FURNITURE|packages nag furiously. carefully unusual accounts snooze according to the fluffily regular pinto beans. slyly spec|
+142|Customer#000000142|AnJ5lxtLjioClr2khl9pb8NLxG2,|9|19-407-425-2584|2209.81|AUTOMOBILE|. even, express theodolites upo|
+143|Customer#000000143|681r22uL452zqk 8By7I9o9enQfx0|16|26-314-406-7725|2186.50|MACHINERY|across the blithely unusual requests haggle theodo|
+144|Customer#000000144|VxYZ3ebhgbltnetaGjNC8qCccjYU05 fePLOno8y|1|11-717-379-4478|6417.31|MACHINERY|ges. slyly regular accounts are slyly. bold, idle reque|
+145|Customer#000000145|kQjHmt2kcec cy3hfMh969u|13|23-562-444-8454|9748.93|HOUSEHOLD|ests? express, express instructions use. blithely fina|
+146|Customer#000000146|GdxkdXG9u7iyI1,,y5tq4ZyrcEy|3|13-835-723-3223|3328.68|FURNITURE|ffily regular dinos are slyly unusual requests. slyly specia|
+147|Customer#000000147|6VvIwbVdmcsMzuu,C84GtBWPaipGfi7DV|18|28-803-187-4335|8071.40|AUTOMOBILE|ress packages above the blithely regular packages sleep fluffily blithely ironic accounts. |
+148|Customer#000000148|BhSPlEWGvIJyT9swk vCWE|11|21-562-498-6636|2135.60|HOUSEHOLD|ing to the carefully ironic requests. carefully regular dependencies about the theodolites wake furious|
+149|Customer#000000149|3byTHCp2mNLPigUrrq|19|29-797-439-6760|8959.65|AUTOMOBILE|al instructions haggle against the slyly bold w|
+150|Customer#000000150|zeoGShTjCwGPplOWFkLURrh41O0AZ8dwNEEN4 |18|28-328-564-7630|3849.48|MACHINERY|ole blithely among the furiously pending packages. furiously bold ideas wake fluffily ironic idea|
diff --git a/genomix/genomix-core/src/test/resources/expected/part-0 b/genomix/genomix-core/src/test/resources/expected/part-0
new file mode 100755
index 0000000..ce3b00c
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/expected/part-0
@@ -0,0 +1,150 @@
+100|Customer#000000100|fptUABXcmkC5Wx|20|30-749-445-4907|9889.89|FURNITURE|was furiously fluffily quiet deposits. silent, pending requests boost against |
+101|Customer#000000101|sMmL2rNeHDltovSm Y|2|12-514-298-3699|7470.96|MACHINERY| sleep. pending packages detect slyly ironic pack|
+102|Customer#000000102|UAtflJ06 fn9zBfKjInkQZlWtqaA|19|29-324-978-8538|8462.17|BUILDING|ously regular dependencies nag among the furiously express dinos. blithely final|
+103|Customer#000000103|8KIsQX4LJ7QMsj6DrtFtXu0nUEdV,8a|9|19-216-107-2107|2757.45|BUILDING|furiously pending notornis boost slyly around the blithely ironic ideas? final, even instructions cajole fl|
+104|Customer#000000104|9mcCK L7rt0SwiYtrbO88DiZS7U d7M|10|20-966-284-8065|-588.38|FURNITURE|rate carefully slyly special pla|
+105|Customer#000000105|4iSJe4L SPjg7kJj98Yz3z0B|10|20-793-553-6417|9091.82|MACHINERY|l pains cajole even accounts. quietly final instructi|
+106|Customer#000000106|xGCOEAUjUNG|1|11-751-989-4627|3288.42|MACHINERY|lose slyly. ironic accounts along the evenly regular theodolites wake about the special, final gifts. |
+107|Customer#000000107|Zwg64UZ,q7GRqo3zm7P1tZIRshBDz|15|25-336-529-9919|2514.15|AUTOMOBILE|counts cajole slyly. regular requests wake. furiously regular deposits about the blithely final fo|
+108|Customer#000000108|GPoeEvpKo1|5|15-908-619-7526|2259.38|BUILDING|refully ironic deposits sleep. regular, unusual requests wake slyly|
+109|Customer#000000109|OOOkYBgCMzgMQXUmkocoLb56rfrdWp2NE2c|16|26-992-422-8153|-716.10|BUILDING|es. fluffily final dependencies sleep along the blithely even pinto beans. final deposits haggle furiously furiou|
+10|Customer#000000010|6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2|5|15-741-346-9870|2753.54|HOUSEHOLD|es regular deposits haggle. fur|
+110|Customer#000000110|mymPfgphaYXNYtk|10|20-893-536-2069|7462.99|AUTOMOBILE|nto beans cajole around the even, final deposits. quickly bold packages according to the furiously regular dept|
+111|Customer#000000111|CBSbPyOWRorloj2TBvrK9qp9tHBs|22|32-582-283-7528|6505.26|MACHINERY|ly unusual instructions detect fluffily special deposits-- theodolites nag carefully during the ironic dependencies|
+112|Customer#000000112|RcfgG3bO7QeCnfjqJT1|19|29-233-262-8382|2953.35|FURNITURE|rmanently unusual multipliers. blithely ruthless deposits are furiously along the|
+113|Customer#000000113|eaOl5UBXIvdY57rglaIzqvfPD,MYfK|12|22-302-930-4756|2912.00|BUILDING|usly regular theodolites boost furiously doggedly pending instructio|
+114|Customer#000000114|xAt 5f5AlFIU|14|24-805-212-7646|1027.46|FURNITURE|der the carefully express theodolites are after the packages. packages are. bli|
+115|Customer#000000115|0WFt1IXENmUT2BgbsB0ShVKJZt0HCBCbFl0aHc|8|18-971-699-1843|7508.92|HOUSEHOLD|sits haggle above the carefully ironic theodolite|
+116|Customer#000000116|yCuVxIgsZ3,qyK2rloThy3u|16|26-632-309-5792|8403.99|BUILDING|as. quickly final sauternes haggle slyly carefully even packages. brave, ironic pinto beans are above the furious|
+117|Customer#000000117|uNhM,PzsRA3S,5Y Ge5Npuhi|24|34-403-631-3505|3950.83|FURNITURE|affix. instructions are furiously sl|
+118|Customer#000000118|OVnFuHygK9wx3xpg8|18|28-639-943-7051|3582.37|AUTOMOBILE|uick packages alongside of the furiously final deposits haggle above the fluffily even foxes. blithely dogged dep|
+119|Customer#000000119|M1ETOIecuvH8DtM0Y0nryXfW|7|17-697-919-8406|3930.35|FURNITURE|express ideas. blithely ironic foxes thrash. special acco|
+11|Customer#000000011|PkWS 3HlXqwTuzrKg633BEi|23|33-464-151-3439|-272.60|BUILDING|ckages. requests sleep slyly. quickly even pinto beans promise above the slyly regular pinto beans. |
+120|Customer#000000120|zBNna00AEInqyO1|12|22-291-534-1571|363.75|MACHINERY| quickly. slyly ironic requests cajole blithely furiously final dependen|
+121|Customer#000000121|tv nCR2YKupGN73mQudO|17|27-411-990-2959|6428.32|BUILDING|uriously stealthy ideas. carefully final courts use carefully|
+122|Customer#000000122|yp5slqoNd26lAENZW3a67wSfXA6hTF|3|13-702-694-4520|7865.46|HOUSEHOLD| the special packages hinder blithely around the permanent requests. bold depos|
+123|Customer#000000123|YsOnaaER8MkvK5cpf4VSlq|5|15-817-151-1168|5897.83|BUILDING|ependencies. regular, ironic requests are fluffily regu|
+124|Customer#000000124|aTbyVAW5tCd,v09O|18|28-183-750-7809|1842.49|AUTOMOBILE|le fluffily even dependencies. quietly s|
+125|Customer#000000125|,wSZXdVR xxIIfm9s8ITyLl3kgjT6UC07GY0Y|19|29-261-996-3120|-234.12|FURNITURE|x-ray finally after the packages? regular requests c|
+126|Customer#000000126|ha4EHmbx3kg DYCsP6DFeUOmavtQlHhcfaqr|22|32-755-914-7592|1001.39|HOUSEHOLD|s about the even instructions boost carefully furiously ironic pearls. ruthless, |
+127|Customer#000000127|Xyge4DX2rXKxXyye1Z47LeLVEYMLf4Bfcj|21|31-101-672-2951|9280.71|MACHINERY|ic, unusual theodolites nod silently after the final, ironic instructions: pending r|
+128|Customer#000000128|AmKUMlJf2NRHcKGmKjLS|4|14-280-874-8044|-986.96|HOUSEHOLD|ing packages integrate across the slyly unusual dugouts. blithely silent ideas sublate carefully. blithely expr|
+129|Customer#000000129|q7m7rbMM0BpaCdmxloCgBDRCleXsXkdD8kf|7|17-415-148-7416|9127.27|HOUSEHOLD| unusual deposits boost carefully furiously silent ideas. pending accounts cajole slyly across|
+12|Customer#000000012|9PWKuhzT4Zr1Q|13|23-791-276-1263|3396.49|HOUSEHOLD| to the carefully final braids. blithely regular requests nag. ironic theodolites boost quickly along|
+130|Customer#000000130|RKPx2OfZy0Vn 8wGWZ7F2EAvmMORl1k8iH|9|19-190-993-9281|5073.58|HOUSEHOLD|ix slowly. express packages along the furiously ironic requests integrate daringly deposits. fur|
+131|Customer#000000131|jyN6lAjb1FtH10rMC,XzlWyCBrg75|11|21-840-210-3572|8595.53|HOUSEHOLD|jole special packages. furiously final dependencies about the furiously speci|
+132|Customer#000000132|QM5YabAsTLp9|4|14-692-150-9717|162.57|HOUSEHOLD|uickly carefully special theodolites. carefully regular requests against the blithely unusual instructions |
+133|Customer#000000133|IMCuXdpIvdkYO92kgDGuyHgojcUs88p|17|27-408-997-8430|2314.67|AUTOMOBILE|t packages. express pinto beans are blithely along the unusual, even theodolites. silent packages use fu|
+134|Customer#000000134|sUiZ78QCkTQPICKpA9OBzkUp2FM|11|21-200-159-5932|4608.90|BUILDING|yly fluffy foxes boost final ideas. b|
+135|Customer#000000135|oZK,oC0 fdEpqUML|19|29-399-293-6241|8732.91|FURNITURE| the slyly final accounts. deposits cajole carefully. carefully sly packag|
+136|Customer#000000136|QoLsJ0v5C1IQbh,DS1|7|17-501-210-4726|-842.39|FURNITURE|ackages sleep ironic, final courts. even requests above the blithely bold requests g|
+137|Customer#000000137|cdW91p92rlAEHgJafqYyxf1Q|16|26-777-409-5654|7838.30|HOUSEHOLD|carefully regular theodolites use. silent dolphins cajo|
+138|Customer#000000138|5uyLAeY7HIGZqtu66Yn08f|5|15-394-860-4589|430.59|MACHINERY|ts doze on the busy ideas. regular|
+139|Customer#000000139|3ElvBwudHKL02732YexGVFVt |9|19-140-352-1403|7897.78|MACHINERY|nstructions. quickly ironic ideas are carefully. bold, |
+13|Customer#000000013|nsXQu0oVjD7PM659uC3SRSp|3|13-761-547-5974|3857.34|BUILDING|ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely|
+140|Customer#000000140|XRqEPiKgcETII,iOLDZp5jA|4|14-273-885-6505|9963.15|MACHINERY|ies detect slyly ironic accounts. slyly ironic theodolites hag|
+141|Customer#000000141|5IW,WROVnikc3l7DwiUDGQNGsLBGOL6Dc0|1|11-936-295-6204|6706.14|FURNITURE|packages nag furiously. carefully unusual accounts snooze according to the fluffily regular pinto beans. slyly spec|
+142|Customer#000000142|AnJ5lxtLjioClr2khl9pb8NLxG2,|9|19-407-425-2584|2209.81|AUTOMOBILE|. even, express theodolites upo|
+143|Customer#000000143|681r22uL452zqk 8By7I9o9enQfx0|16|26-314-406-7725|2186.50|MACHINERY|across the blithely unusual requests haggle theodo|
+144|Customer#000000144|VxYZ3ebhgbltnetaGjNC8qCccjYU05 fePLOno8y|1|11-717-379-4478|6417.31|MACHINERY|ges. slyly regular accounts are slyly. bold, idle reque|
+145|Customer#000000145|kQjHmt2kcec cy3hfMh969u|13|23-562-444-8454|9748.93|HOUSEHOLD|ests? express, express instructions use. blithely fina|
+146|Customer#000000146|GdxkdXG9u7iyI1,,y5tq4ZyrcEy|3|13-835-723-3223|3328.68|FURNITURE|ffily regular dinos are slyly unusual requests. slyly specia|
+147|Customer#000000147|6VvIwbVdmcsMzuu,C84GtBWPaipGfi7DV|18|28-803-187-4335|8071.40|AUTOMOBILE|ress packages above the blithely regular packages sleep fluffily blithely ironic accounts. |
+148|Customer#000000148|BhSPlEWGvIJyT9swk vCWE|11|21-562-498-6636|2135.60|HOUSEHOLD|ing to the carefully ironic requests. carefully regular dependencies about the theodolites wake furious|
+149|Customer#000000149|3byTHCp2mNLPigUrrq|19|29-797-439-6760|8959.65|AUTOMOBILE|al instructions haggle against the slyly bold w|
+14|Customer#000000014|KXkletMlL2JQEA |1|11-845-129-3851|5266.30|FURNITURE|, ironic packages across the unus|
+150|Customer#000000150|zeoGShTjCwGPplOWFkLURrh41O0AZ8dwNEEN4 |18|28-328-564-7630|3849.48|MACHINERY|ole blithely among the furiously pending packages. furiously bold ideas wake fluffily ironic idea|
+15|Customer#000000015|YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn|23|33-687-542-7601|2788.52|HOUSEHOLD| platelets. regular deposits detect asymptotes. blithely unusual packages nag slyly at the fluf|
+16|Customer#000000016|cYiaeMLZSMAOQ2 d0W,|10|20-781-609-3107|4681.03|FURNITURE|kly silent courts. thinly regular theodolites sleep fluffily after |
+17|Customer#000000017|izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7|2|12-970-682-3487|6.34|AUTOMOBILE|packages wake! blithely even pint|
+18|Customer#000000018|3txGO AiuFux3zT0Z9NYaFRnZt|6|16-155-215-1315|5494.43|BUILDING|s sleep. carefully even instructions nag furiously alongside of t|
+19|Customer#000000019|uc,3bHIx84H,wdrmLOjVsiqXCq2tr|18|28-396-526-5053|8914.71|HOUSEHOLD| nag. furiously careful packages are slyly at the accounts. furiously regular in|
+1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even, regular platelets. regular, ironic epitaphs nag e|
+20|Customer#000000020|JrPk8Pqplj4Ne|22|32-957-234-8742|7603.40|FURNITURE|g alongside of the special excuses-- fluffily enticing packages wake |
+21|Customer#000000021|XYmVpr9yAHDEn|8|18-902-614-8344|1428.25|MACHINERY| quickly final accounts integrate blithely furiously u|
+22|Customer#000000022|QI6p41,FNs5k7RZoCCVPUTkUdYpB|3|13-806-545-9701|591.98|MACHINERY|s nod furiously above the furiously ironic ideas. |
+23|Customer#000000023|OdY W13N7Be3OC5MpgfmcYss0Wn6TKT|3|13-312-472-8245|3332.02|HOUSEHOLD|deposits. special deposits cajole slyly. fluffily special deposits about the furiously |
+24|Customer#000000024|HXAFgIAyjxtdqwimt13Y3OZO 4xeLe7U8PqG|13|23-127-851-8031|9255.67|MACHINERY|into beans. fluffily final ideas haggle fluffily|
+25|Customer#000000025|Hp8GyFQgGHFYSilH5tBfe|12|22-603-468-3533|7133.70|FURNITURE|y. accounts sleep ruthlessly according to the regular theodolites. unusual instructions sleep. ironic, final|
+26|Customer#000000026|8ljrc5ZeMl7UciP|22|32-363-455-4837|5182.05|AUTOMOBILE|c requests use furiously ironic requests. slyly ironic dependencies us|
+27|Customer#000000027|IS8GIyxpBrLpMT0u7|3|13-137-193-2709|5679.84|BUILDING| about the carefully ironic pinto beans. accoun|
+28|Customer#000000028|iVyg0daQ,Tha8x2WPWA9m2529m|8|18-774-241-1462|1007.18|FURNITURE| along the regular deposits. furiously final pac|
+29|Customer#000000029|sJ5adtfyAkCK63df2,vF25zyQMVYE34uh|0|10-773-203-7342|7618.27|FURNITURE|its after the carefully final platelets x-ray against |
+2|Customer#000000002|XSTf4,NCwDVaWNe6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts. blithely ironic theodolites integrate boldly: caref|
+30|Customer#000000030|nJDsELGAavU63Jl0c5NKsKfL8rIJQQkQnYL2QJY|1|11-764-165-5076|9321.01|BUILDING|lithely final requests. furiously unusual account|
+31|Customer#000000031|LUACbO0viaAv6eXOAebryDB xjVst|23|33-197-837-7094|5236.89|HOUSEHOLD|s use among the blithely pending depo|
+32|Customer#000000032|jD2xZzi UmId,DCtNBLXKj9q0Tlp2iQ6ZcO3J|15|25-430-914-2194|3471.53|BUILDING|cial ideas. final, furious requests across the e|
+33|Customer#000000033|qFSlMuLucBmx9xnn5ib2csWUweg D|17|27-375-391-1280|-78.56|AUTOMOBILE|s. slyly regular accounts are furiously. carefully pending requests|
+34|Customer#000000034|Q6G9wZ6dnczmtOx509xgE,M2KV|15|25-344-968-5422|8589.70|HOUSEHOLD|nder against the even, pending accounts. even|
+35|Customer#000000035|TEjWGE4nBzJL2|17|27-566-888-7431|1228.24|HOUSEHOLD|requests. special, express requests nag slyly furiousl|
+36|Customer#000000036|3TvCzjuPzpJ0,DdJ8kW5U|21|31-704-669-5769|4987.27|BUILDING|haggle. enticing, quiet platelets grow quickly bold sheaves. carefully regular acc|
+37|Customer#000000037|7EV4Pwh,3SboctTWt|8|18-385-235-7162|-917.75|FURNITURE|ilent packages are carefully among the deposits. furiousl|
+38|Customer#000000038|a5Ee5e9568R8RLP 2ap7|12|22-306-880-7212|6345.11|HOUSEHOLD|lar excuses. closely even asymptotes cajole blithely excuses. carefully silent pinto beans sleep carefully fin|
+39|Customer#000000039|nnbRg,Pvy33dfkorYE FdeZ60|2|12-387-467-6509|6264.31|AUTOMOBILE|tions. slyly silent excuses slee|
+3|Customer#000000003|MG9kdTD2WBHm|1|11-719-748-3364|7498.12|AUTOMOBILE| deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov|
+40|Customer#000000040|gOnGWAyhSV1ofv|3|13-652-915-8939|1335.30|BUILDING|rges impress after the slyly ironic courts. foxes are. blithely |
+41|Customer#000000041|IM9mzmyoxeBmvNw8lA7G3Ydska2nkZF|10|20-917-711-4011|270.95|HOUSEHOLD|ly regular accounts hang bold, silent packages. unusual foxes haggle slyly above the special, final depo|
+42|Customer#000000042|ziSrvyyBke|5|15-416-330-4175|8727.01|BUILDING|ssly according to the pinto beans: carefully special requests across the even, pending accounts wake special|
+43|Customer#000000043|ouSbjHk8lh5fKX3zGso3ZSIj9Aa3PoaFd|19|29-316-665-2897|9904.28|MACHINERY|ial requests: carefully pending foxes detect quickly. carefully final courts cajole quickly. carefully|
+44|Customer#000000044|Oi,dOSPwDu4jo4x,,P85E0dmhZGvNtBwi|16|26-190-260-5375|7315.94|AUTOMOBILE|r requests around the unusual, bold a|
+45|Customer#000000045|4v3OcpFgoOmMG,CbnF,4mdC|9|19-715-298-9917|9983.38|AUTOMOBILE|nto beans haggle slyly alongside of t|
+46|Customer#000000046|eaTXWWm10L9|6|16-357-681-2007|5744.59|AUTOMOBILE|ctions. accounts sleep furiously even requests. regular, regular accounts cajole blithely around the final pa|
+47|Customer#000000047|b0UgocSqEW5 gdVbhNT|2|12-427-271-9466|274.58|BUILDING|ions. express, ironic instructions sleep furiously ironic ideas. furi|
+48|Customer#000000048|0UU iPhBupFvemNB|0|10-508-348-5882|3792.50|BUILDING|re fluffily pending foxes. pending, bold platelets sleep slyly. even platelets cajo|
+49|Customer#000000049|cNgAeX7Fqrdf7HQN9EwjUa4nxT,68L FKAxzl|10|20-908-631-4424|4573.94|FURNITURE|nusual foxes! fluffily pending packages maintain to the regular |
+4|Customer#000000004|XxVSJsLAGtn|4|14-128-190-5944|2866.83|MACHINERY| requests. final, regular ideas sleep final accou|
+50|Customer#000000050|9SzDYlkzxByyJ1QeTI o|6|16-658-112-3221|4266.13|MACHINERY|ts. furiously ironic accounts cajole furiously slyly ironic dinos.|
+51|Customer#000000051|uR,wEaiTvo4|12|22-344-885-4251|855.87|FURNITURE|eposits. furiously regular requests integrate carefully packages. furious|
+52|Customer#000000052|7 QOqGqqSy9jfV51BC71jcHJSD0|11|21-186-284-5998|5630.28|HOUSEHOLD|ic platelets use evenly even accounts. stealthy theodolites cajole furiou|
+53|Customer#000000053|HnaxHzTfFTZs8MuCpJyTbZ47Cm4wFOOgib|15|25-168-852-5363|4113.64|HOUSEHOLD|ar accounts are. even foxes are blithely. fluffily pending deposits boost|
+54|Customer#000000054|,k4vf 5vECGWFy,hosTE,|4|14-776-370-4745|868.90|AUTOMOBILE|sual, silent accounts. furiously express accounts cajole special deposits. final, final accounts use furi|
+55|Customer#000000055|zIRBR4KNEl HzaiV3a i9n6elrxzDEh8r8pDom|10|20-180-440-8525|4572.11|MACHINERY|ully unusual packages wake bravely bold packages. unusual requests boost deposits! blithely ironic packages ab|
+56|Customer#000000056|BJYZYJQk4yD5B|10|20-895-685-6920|6530.86|FURNITURE|. notornis wake carefully. carefully fluffy requests are furiously even accounts. slyly expre|
+57|Customer#000000057|97XYbsuOPRXPWU|21|31-835-306-1650|4151.93|AUTOMOBILE|ove the carefully special packages. even, unusual deposits sleep slyly pend|
+58|Customer#000000058|g9ap7Dk1Sv9fcXEWjpMYpBZIRUohi T|13|23-244-493-2508|6478.46|HOUSEHOLD|ideas. ironic ideas affix furiously express, final instructions. regular excuses use quickly e|
+59|Customer#000000059|zLOCP0wh92OtBihgspOGl4|1|11-355-584-3112|3458.60|MACHINERY|ously final packages haggle blithely after the express deposits. furiou|
+5|Customer#000000005|KvpyuHCplrB84WgAiGV6sYpZq7Tj|3|13-750-942-6364|794.47|HOUSEHOLD|n accounts will have to unwind. foxes cajole accor|
+60|Customer#000000060|FyodhjwMChsZmUz7Jz0H|12|22-480-575-5866|2741.87|MACHINERY|latelets. blithely unusual courts boost furiously about the packages. blithely final instruct|
+61|Customer#000000061|9kndve4EAJxhg3veF BfXr7AqOsT39o gtqjaYE|17|27-626-559-8599|1536.24|FURNITURE|egular packages shall have to impress along the |
+62|Customer#000000062|upJK2Dnw13,|7|17-361-978-7059|595.61|MACHINERY|kly special dolphins. pinto beans are slyly. quickly regular accounts are furiously a|
+63|Customer#000000063|IXRSpVWWZraKII|21|31-952-552-9584|9331.13|AUTOMOBILE|ithely even accounts detect slyly above the fluffily ir|
+64|Customer#000000064|MbCeGY20kaKK3oalJD,OT|3|13-558-731-7204|-646.64|BUILDING|structions after the quietly ironic theodolites cajole be|
+65|Customer#000000065|RGT yzQ0y4l0H90P783LG4U95bXQFDRXbWa1sl,X|23|33-733-623-5267|8795.16|AUTOMOBILE|y final foxes serve carefully. theodolites are carefully. pending i|
+66|Customer#000000066|XbsEqXH1ETbJYYtA1A|22|32-213-373-5094|242.77|HOUSEHOLD|le slyly accounts. carefully silent packages benea|
+67|Customer#000000067|rfG0cOgtr5W8 xILkwp9fpCS8|9|19-403-114-4356|8166.59|MACHINERY|indle furiously final, even theodo|
+68|Customer#000000068|o8AibcCRkXvQFh8hF,7o|12|22-918-832-2411|6853.37|HOUSEHOLD| pending pinto beans impress realms. final dependencies |
+69|Customer#000000069|Ltx17nO9Wwhtdbe9QZVxNgP98V7xW97uvSH1prEw|9|19-225-978-5670|1709.28|HOUSEHOLD|thely final ideas around the quickly final dependencies affix carefully quickly final theodolites. final accounts c|
+6|Customer#000000006|sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn|20|30-114-968-4951|7638.57|AUTOMOBILE|tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious|
+70|Customer#000000070|mFowIuhnHjp2GjCiYYavkW kUwOjIaTCQ|22|32-828-107-2832|4867.52|FURNITURE|fter the special asymptotes. ideas after the unusual frets cajole quickly regular pinto be|
+71|Customer#000000071|TlGalgdXWBmMV,6agLyWYDyIz9MKzcY8gl,w6t1B|7|17-710-812-5403|-611.19|HOUSEHOLD|g courts across the regular, final pinto beans are blithely pending ac|
+72|Customer#000000072|putjlmskxE,zs,HqeIA9Wqu7dhgH5BVCwDwHHcf|2|12-759-144-9689|-362.86|FURNITURE|ithely final foxes sleep always quickly bold accounts. final wat|
+73|Customer#000000073|8IhIxreu4Ug6tt5mog4|0|10-473-439-3214|4288.50|BUILDING|usual, unusual packages sleep busily along the furiou|
+74|Customer#000000074|IkJHCA3ZThF7qL7VKcrU nRLl,kylf |4|14-199-862-7209|2764.43|MACHINERY|onic accounts. blithely slow packages would haggle carefully. qui|
+75|Customer#000000075|Dh 6jZ,cwxWLKQfRKkiGrzv6pm|18|28-247-803-9025|6684.10|AUTOMOBILE| instructions cajole even, even deposits. finally bold deposits use above the even pains. slyl|
+76|Customer#000000076|m3sbCvjMOHyaOofH,e UkGPtqc4|0|10-349-718-3044|5745.33|FURNITURE|pecial deposits. ironic ideas boost blithely according to the closely ironic theodolites! furiously final deposits n|
+77|Customer#000000077|4tAE5KdMFGD4byHtXF92vx|17|27-269-357-4674|1738.87|BUILDING|uffily silent requests. carefully ironic asymptotes among the ironic hockey players are carefully bli|
+78|Customer#000000078|HBOta,ZNqpg3U2cSL0kbrftkPwzX|9|19-960-700-9191|7136.97|FURNITURE|ests. blithely bold pinto beans h|
+79|Customer#000000079|n5hH2ftkVRwW8idtD,BmM2|15|25-147-850-4166|5121.28|MACHINERY|es. packages haggle furiously. regular, special requests poach after the quickly express ideas. blithely pending re|
+7|Customer#000000007|TcGe5gaZNgVePxU5kRrvXBfkasDTea|18|28-190-982-9759|9561.95|AUTOMOBILE|ainst the ironic, express theodolites. express, even pinto beans among the exp|
+80|Customer#000000080|K,vtXp8qYB |0|10-267-172-7101|7383.53|FURNITURE|tect among the dependencies. bold accounts engage closely even pinto beans. ca|
+81|Customer#000000081|SH6lPA7JiiNC6dNTrR|20|30-165-277-3269|2023.71|BUILDING|r packages. fluffily ironic requests cajole fluffily. ironically regular theodolit|
+82|Customer#000000082|zhG3EZbap4c992Gj3bK,3Ne,Xn|18|28-159-442-5305|9468.34|AUTOMOBILE|s wake. bravely regular accounts are furiously. regula|
+83|Customer#000000083|HnhTNB5xpnSF20JBH4Ycs6psVnkC3RDf|22|32-817-154-4122|6463.51|BUILDING|ccording to the quickly bold warhorses. final, regular foxes integrate carefully. bold packages nag blithely ev|
+84|Customer#000000084|lpXz6Fwr9945rnbtMc8PlueilS1WmASr CB|11|21-546-818-3802|5174.71|FURNITURE|ly blithe foxes. special asymptotes haggle blithely against the furiously regular depo|
+85|Customer#000000085|siRerlDwiolhYR 8FgksoezycLj|5|15-745-585-8219|3386.64|FURNITURE|ronic ideas use above the slowly pendin|
+86|Customer#000000086|US6EGGHXbTTXPL9SBsxQJsuvy|0|10-677-951-2353|3306.32|HOUSEHOLD|quests. pending dugouts are carefully aroun|
+87|Customer#000000087|hgGhHVSWQl 6jZ6Ev|23|33-869-884-7053|6327.54|FURNITURE|hely ironic requests integrate according to the ironic accounts. slyly regular pla|
+88|Customer#000000088|wtkjBN9eyrFuENSMmMFlJ3e7jE5KXcg|16|26-516-273-2566|8031.44|AUTOMOBILE|s are quickly above the quickly ironic instructions; even requests about the carefully final deposi|
+89|Customer#000000089|dtR, y9JQWUO6FoJExyp8whOU|14|24-394-451-5404|1530.76|FURNITURE|counts are slyly beyond the slyly final accounts. quickly final ideas wake. r|
+8|Customer#000000008|I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5|17|27-147-574-9335|6819.74|BUILDING|among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly along the ide|
+90|Customer#000000090|QxCzH7VxxYUWwfL7|16|26-603-491-1238|7354.23|BUILDING|sly across the furiously even |
+91|Customer#000000091|S8OMYFrpHwoNHaGBeuS6E 6zhHGZiprw1b7 q|8|18-239-400-3677|4643.14|AUTOMOBILE|onic accounts. fluffily silent pinto beans boost blithely according to the fluffily exp|
+92|Customer#000000092|obP PULk2LH LqNF,K9hcbNqnLAkJVsl5xqSrY,|2|12-446-416-8471|1182.91|MACHINERY|. pinto beans hang slyly final deposits. ac|
+93|Customer#000000093|EHXBr2QGdh|7|17-359-388-5266|2182.52|MACHINERY|press deposits. carefully regular platelets r|
+94|Customer#000000094|IfVNIN9KtkScJ9dUjK3Pg5gY1aFeaXewwf|9|19-953-499-8833|5500.11|HOUSEHOLD|latelets across the bold, final requests sleep according to the fluffily bold accounts. unusual deposits amon|
+95|Customer#000000095|EU0xvmWvOmUUn5J,2z85DQyG7QCJ9Xq7|15|25-923-255-2929|5327.38|MACHINERY|ithely. ruthlessly final requests wake slyly alongside of the furiously silent pinto beans. even the|
+96|Customer#000000096|vWLOrmXhRR|8|18-422-845-1202|6323.92|AUTOMOBILE|press requests believe furiously. carefully final instructions snooze carefully. |
+97|Customer#000000097|OApyejbhJG,0Iw3j rd1M|17|27-588-919-5638|2164.48|AUTOMOBILE|haggle slyly. bold, special ideas are blithely above the thinly bold theo|
+98|Customer#000000098|7yiheXNSpuEAwbswDW|12|22-885-845-6889|-551.37|BUILDING|ages. furiously pending accounts are quickly carefully final foxes: busily pe|
+99|Customer#000000099|szsrOiPtCHVS97Lt|15|25-515-237-9232|4088.65|HOUSEHOLD|cajole slyly about the regular theodolites! furiously bold requests nag along the pending, regular packages. somas|
+9|Customer#000000009|xKiAFTjUsCuxfeleNqefumTrjS|8|18-338-906-3675|8324.07|FURNITURE|r theodolites according to the requests wake thinly excuses: pending requests haggle furiousl|
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>fs.default.name</name>
+ <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>dfs.replication</name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dfs.block.size</name>
+ <value>65536</value>
+</property>
+
+</configuration>
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties b/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml b/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..39b6505
--- /dev/null
+++ b/genomix/genomix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>2048</value>
+ </property>
+
+</configuration>