fix comparator bug in VLongKmer
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3112 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index dae1b6f..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -1,53 +0,0 @@
-package edu.uci.ics.genomix.data.normalizers;
-
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-
-/**
- * Aggregation sort: speed up from hyracks
- *
- */
-public class Integer64NormalizedKeyComputerFactory implements
- INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 8735044913496854551L;
-
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- private static final int POSTIVE_LONG_MASK = (3 << 30);
- private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
- private static final int NEGATIVE_LONG_MASK = (0 << 30);
-
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- long value = Integer64SerializerDeserializer.getLong(bytes,
- start);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /** * larger than Integer.MAX */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /** * smaller than Integer.MAX but >=0 */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /** * less than 0: have not optimized for that */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
- }
- }
-
- private int getKey(int value) {
- return value ^ Integer.MIN_VALUE;
- }
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
index 969431c..17ca8cb 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/normalizers/VLongNormalizedKeyComputerFactory.java
@@ -1,5 +1,6 @@
package edu.uci.ics.genomix.data.normalizers;
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -14,25 +15,12 @@
private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
private static final int NEGATIVE_LONG_MASK = (0 << 30);
- private long getLong(byte[] bytes, int offset) {
- int l = (int) Math.ceil((double) bytes[offset] / 4.0);
- int n = (l < 8) ? l : 8;
-
- long r = 0;
- for (int i = 0; i < n; i++) {
- r <<= 8;
- r += (long) (bytes[offset + i + 1] & 0xff);
- }
-
- return r;
- }
-
/**
* one kmer
*/
@Override
public int normalize(byte[] bytes, int start, int length) {
- long value = getLong(bytes, start);
+ long value = KmerHashPartitioncomputerFactory.getLong(bytes, start);
int highValue = (int) (value >> 32);
if (highValue > 0) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
index d56c7c2..ce60917 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/partition/KmerHashPartitioncomputerFactory.java
@@ -18,6 +18,14 @@
return hash;
}
+
+ public static long getLong(byte[] bytes, int offset) {
+ return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+ + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+ + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+ + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+ }
+
@Override
public ITuplePartitionComputer createPartitioner() {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
index 6477e14..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongBinaryHashFunctionFamily.java
@@ -1,30 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.data.std.api.IHashable;
-import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
-
-public class LongBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction(final int seed) {
-
- return new IBinaryHashFunction() {
- private LongPointable p = new LongPointable();
-
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- if (length + offset >= bytes.length)
- throw new IllegalStateException("out of bound");
- p.set(bytes, offset, length);
- int hash = Math.abs(((IHashable) p).hash() * (seed + 1));
- if (hash < 0) {
- hash = Math.abs(hash + 1);
- }
- return hash;
- }
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
index 661559f..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/LongHashFunctionFamily.java
@@ -1,40 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-
-public class LongHashFunctionFamily implements IBinaryHashFunctionFamily {
- public static final IBinaryHashFunctionFamily INSTANCE = new LongHashFunctionFamily();
-
- private static final long serialVersionUID = 1L;
-
- static final int[] primeCoefficents = { 31, 23, 53, 97, 71, 337, 11, 877,
- 3, 29 };
-
- private LongHashFunctionFamily() {
- }
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction(int seed) {
- final int coefficient = primeCoefficents[seed % primeCoefficents.length];
- final int r = primeCoefficents[(seed + 1) % primeCoefficents.length];
-
- return new IBinaryHashFunction() {
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- int h = 0;
- int utflen = UTF8StringPointable.getUTFLength(bytes, offset);
- int sStart = offset + 2;
- int c = 0;
-
- while (c < utflen) {
- char ch = UTF8StringPointable.charAt(bytes, sStart + c);
- h = (coefficient * h + ch) % r;
- c += UTF8StringPointable.charSize(bytes, sStart + c);
- }
- return h;
- }
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
index b1db6f2..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
@@ -1,77 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;
-
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-
-public class MurmurHash3BinaryHashFunctionFamily implements
- IBinaryHashFunctionFamily {
-
- public static final IBinaryHashFunctionFamily INSTANCE = new MurmurHash3BinaryHashFunctionFamily();
-
- private static final long serialVersionUID = 1L;
-
- private MurmurHash3BinaryHashFunctionFamily() {
- }
-
- private static final int C1 = 0xcc9e2d51;
- private static final int C2 = 0x1b873593;
- private static final int C3 = 5;
- private static final int C4 = 0xe6546b64;
- private static final int C5 = 0x85ebca6b;
- private static final int C6 = 0xc2b2ae35;
-
- @Override
- public IBinaryHashFunction createBinaryHashFunction(final int seed) {
- return new IBinaryHashFunction() {
- @Override
- public int hash(byte[] bytes, int offset, int length) {
- int h = seed;
- int p = offset;
- int remain = length;
- while (remain > 4) {
- int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8)
- | (((int) bytes[p + 2]) << 16)
- | (((int) bytes[p + 3]) << 24);
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- h = Integer.rotateLeft(h, 13);
- h = h * C3 + C4;
- p += 4;
- remain -= 4;
- }
- int k = 0;
- for (int i = 0; remain > 0; i += 8) {
- k ^= bytes[p++] << i;
- remain--;
- }
- k *= C1;
- k = Integer.rotateLeft(k, 15);
- k *= C2;
- h ^= k;
- // switch (remain) {
- // case 3:
- // k = bytes[p++];
- // case 2:
- // k = (k << 8) | bytes[p++];
- // case 1:
- // k = (k << 8) | bytes[p++];
- // k *= C1;
- // k = Integer.rotateLeft(k, 15);
- // k *= C2;
- // h ^= k;
- // h = Integer.rotateLeft(h, 13);
- // h = h * C3 + C4;
- // }
- h ^= length;
- h ^= (h >>> 16);
- h *= C5;
- h ^= (h >>> 13);
- h *= C6;
- h ^= (h >>> 16);
- return h;
- }
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
index b9a0443..7ead93e 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/accessors/VLongBinaryHashFunctionFamily.java
@@ -1,6 +1,6 @@
package edu.uci.ics.genomix.data.std.accessors;
-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
+import edu.uci.ics.genomix.data.std.primitive.VLongKmerPointable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.data.std.api.IHashable;
@@ -12,14 +12,14 @@
public IBinaryHashFunction createBinaryHashFunction(final int seed) {
return new IBinaryHashFunction() {
- private VLongPointable p = new VLongPointable();
+ private VLongKmerPointable p = new VLongKmerPointable();
@Override
public int hash(byte[] bytes, int offset, int length) {
if (length + offset >= bytes.length)
throw new IllegalStateException("out of bound");
p.set(bytes, offset, length);
- int hash = Math.abs(((IHashable) p).hash() * (seed + 1));
+ int hash = Math.abs( p.hash() * (seed + 1));
if (hash < 0) {
hash = Math.abs(hash + 1);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java
new file mode 100644
index 0000000..cb71310
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongKmerPointable.java
@@ -0,0 +1,111 @@
+package edu.uci.ics.genomix.data.std.primitive;
+
+import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+public final class VLongKmerPointable extends AbstractPointable implements
+ IHashable, IComparable, INumeric {
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new VLongKmerPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+
+ public long getLong() {
+ return KmerHashPartitioncomputerFactory.getLong(bytes, start);
+ }
+
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(),
+ pointer.getLength());
+ }
+
+ @Override
+ public int compareTo(byte[] bytes, int start, int length) {
+
+ if (this.length != length) {
+ return this.length - length;
+ }
+
+ for (int i = 0; i < length; i++) {
+ if (this.bytes[this.start + i] < bytes[start + i]) {
+ return -1;
+ } else if (this.bytes[this.start + i] > bytes[start + i]) {
+ return 1;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public int hash() {// BKDRHash
+ int hash = 1;
+ for (int i = start + 1; i <= start + length; i++)
+ hash = (31 * hash) + (int) bytes[i];
+ if (hash < 0) {
+ hash = -(hash + 1);
+ }
+ return hash;
+ }
+
+ @Override
+ public byte byteValue() {
+ return (byte) bytes[start + 1];
+ }
+
+ @Override
+ public short shortValue() {
+
+ return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
+ }
+
+ @Override
+ public int intValue() {
+ return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
+ }
+
+ @Override
+ public long longValue() {
+ return getLong();
+ }
+
+ @Override
+ public float floatValue() {
+ return getLong();
+ }
+
+ @Override
+ public double doubleValue() {
+ return getLong();
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
index c49d6ff..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/data/std/primitive/VLongPointable.java
@@ -1,146 +0,0 @@
-package edu.uci.ics.genomix.data.std.primitive;
-
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
-import edu.uci.ics.hyracks.data.std.api.IComparable;
-import edu.uci.ics.hyracks.data.std.api.IHashable;
-import edu.uci.ics.hyracks.data.std.api.INumeric;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
-
-public final class VLongPointable extends AbstractPointable implements
- IHashable, IComparable, INumeric {
- public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isFixedLength() {
- return false;
- }
-
- @Override
- public int getFixedLength() {
- return -1;
- }
- };
-
- public static final IPointableFactory FACTORY = new IPointableFactory() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public IPointable createPointable() {
- return new VLongPointable();
- }
-
- @Override
- public ITypeTraits getTypeTraits() {
- return TYPE_TRAITS;
- }
- };
-
- public static long getLong(byte[] bytes, int start) {
- int l = (int) Math.ceil((double) bytes[start] / 4.0);
- int n = (l < 8) ? l : 8;
-
- long r = 0;
- for (int i = 0; i < n; i++) {
- r <<= 8;
- r += (long) (bytes[start + 1] & 0xff);
- }
-
- return r;
- }
-
- public long getLong() {
- return getLong(bytes, start);
- }
-
- public byte[] postIncrement() {
- int i = start + 1;
- int l = (int) Math.ceil(bytes[start] / 4);
- while (i <= start + l) {
- bytes[i] += 1;
- if (bytes[i] != 0) {
- break;
- }
- i += 1;
- }
- return bytes;
- }
-
- @Override
- public int compareTo(IPointable pointer) {
- return compareTo(pointer.getByteArray(), pointer.getStartOffset(),
- pointer.getLength());
- }
-
- @Override
- public int compareTo(byte[] bytes, int start, int length) {
-
- int be = this.start;
-
- if (this.bytes[be] != bytes[start]) {
- return (this.bytes[be] < bytes[start]) ? -1 : 1;
- }
-
- int n = this.bytes[be];
- int l = (int) Math.ceil(n / 4);
- for (int i = l; i > 0; i--) {
- if (this.bytes[be + i] < bytes[start + i]) {
- return -1;
- } else if (this.bytes[be + i] > bytes[start + i]) {
- return 1;
- }
- }
- return 0;
- }
-
- @Override
- public int hash() {// BKDRHash
- int hash = 1;
- for (int i = start + 1; i <= start + length; i++)
- hash = (31 * hash) + (int) bytes[i];
- if (hash < 0) {
- hash = -hash;
- }
- // System.err.println(hash);
- return hash;
- /*
- * int seed = 131; // 31 131 1313 13131 131313 etc.. int hash = 0; int l
- * = (int) Math.ceil((double) bytes[start] / 4.0); for (int i = start +
- * 1; i <= start + l; i++) { hash = hash * seed + bytes[i]; } return
- * (hash & 0x7FFFFFFF);
- */
- }
-
- @Override
- public byte byteValue() {
- return (byte) bytes[start + 1];
- }
-
- @Override
- public short shortValue() {
-
- return (short) ((bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
- }
-
- @Override
- public int intValue() {
- return (int) ((bytes[start + 4] & 0xff) << 24 + (bytes[start + 3] & 0xff) << 16 + (bytes[start + 2] & 0xff) << 8 + bytes[start + 1] & 0xff);
- }
-
- @Override
- public long longValue() {
- return getLong();
- }
-
- @Override
- public float floatValue() {
- return getLong();
- }
-
- @Override
- public double doubleValue() {
- return getLong();
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index ec71111..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -1,176 +0,0 @@
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.Path;
-
-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.type.Kmer;
-import edu.uci.ics.genomix.type.Kmer.GENE_CODE;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-public class FileScanDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private int k;
- private String pathSurfix;
- private int byteNum;
-
- public FileScanDescriptor(IOperatorDescriptorRegistry spec, int k,
- String path) {
- super(spec, 0, 1);
- this.k = k;
- this.pathSurfix = path;
-
- byteNum = (int) Math.ceil((double) k / 4.0);
- // recordDescriptors[0] = news RecordDescriptor(
- // new ISerializerDeserializer[] {
- // UTF8StringSerializerDeserializer.INSTANCE });
- recordDescriptors[0] = new RecordDescriptor(
- new ISerializerDeserializer[] { null, null });
- }
-
- public FileScanDescriptor(JobSpecification jobSpec, int kmers,
- Path[] inputPaths) {
- super(jobSpec, 0, 1);
- this.k = kmers;
- this.pathSurfix = inputPaths[0].toString();
- // recordDescriptors[0] = news RecordDescriptor(
- // new ISerializerDeserializer[] {
- // UTF8StringSerializerDeserializer.INSTANCE });
- recordDescriptors[0] = new RecordDescriptor(
- new ISerializerDeserializer[] { null,
- ByteSerializerDeserializer.INSTANCE });
- }
-
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition,
- int nPartitions) {
-
- final int temp = partition;
-
- // TODO Auto-generated method stub
- return (IOperatorNodePushable) new AbstractUnaryOutputSourceOperatorNodePushable() {
- private ArrayTupleBuilder tupleBuilder;
- private ByteBuffer outputBuffer;
- private FrameTupleAppender outputAppender;
-
- @Override
- public void initialize() {
-
- tupleBuilder = new ArrayTupleBuilder(2);
- outputBuffer = ctx.allocateFrame();
- outputAppender = new FrameTupleAppender(ctx.getFrameSize());
- outputAppender.reset(outputBuffer, true);
-
- try {// one try with multiple catch?
- writer.open();
- String s = pathSurfix + String.valueOf(temp);
-
- File tf = new File(s);
-
- File[] fa = tf.listFiles();
-
- for (int i = 0; i < fa.length; i++) {
- BufferedReader readsfile = new BufferedReader(
- new InputStreamReader(
- new FileInputStream(fa[i])));
- String read = readsfile.readLine();
- // int count = 0;
- while (read != null) {
- read = readsfile.readLine();
- // if(count % 4 == 1)
- Pattern genePattern = Pattern.compile("[AGCT]+");
- Matcher geneMatcher = genePattern.matcher(read);
- boolean isValid = geneMatcher.matches();
- if (isValid) {
- SplitReads(read.getBytes(),writer);
- }
- // count += 1;
- // System.err.println(count);
- }
- }
- if (outputAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outputBuffer, writer);
- }
- outputAppender = null;
- outputBuffer = null;
- // sort code for external sort here?
- writer.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- throw new IllegalStateException(e);
- }
- }
-
- private void SplitReads(byte[] array, IFrameWriter writer) {
- /** first kmer */
- byte[] kmer = Kmer.CompressKmer(k, array, 0);
- byte pre = 0;
- byte next = GENE_CODE.getAdjBit(array[k]);
- InsertToFrame(kmer, pre, next, writer);
-
- /** middle kmer */
- for (int i = k; i < array.length - 1; i++) {
- pre = Kmer.MoveKmer(k, kmer, array[i]);
- next = GENE_CODE.getAdjBit(array[i + 1]);
- InsertToFrame(kmer, pre, next, writer);
-
- }
- /** last kmer */
- pre = Kmer.MoveKmer(k, kmer, array[array.length - 1]);
- next = 0;
- InsertToFrame(kmer, pre, next, writer);
- }
-
- private void InsertToFrame(byte[] kmer, byte pre, byte next,
- IFrameWriter writer) {
- try {
- byte adj = GENE_CODE.mergePreNextAdj(pre, next);
- tupleBuilder.reset();
- tupleBuilder.addField(kmer, 0, byteNum);
- tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE,
- adj);
-
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- FrameUtils.flushFrame(outputBuffer, writer);
- outputAppender.reset(outputBuffer, true);
- if (!outputAppender.append(
- tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0,
- tupleBuilder.getSize())) {
- throw new IllegalStateException(
- "Failed to copy an record into a frame: the record size is too large.");
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
- };
-
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
index 3cc4b8a..51e5221 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerSequenceWriterFactory.java
@@ -38,6 +38,7 @@
KmerCountValue reEnterCount = new KmerCountValue();
BytesWritable reEnterKey = new BytesWritable();
+
/**
* assumption is that output never change source!
*/
@@ -48,18 +49,19 @@
byte[] kmer = tuple.getFieldData(0);
int keyStart = tuple.getFieldStart(0);
int keyLength = tuple.getFieldLength(0);
-
+
byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
reEnterCount.reset(bitmap, count);
reEnterKey.set(kmer, keyStart, keyLength);
writer.append(reEnterKey, reEnterCount);
- // @mark: this method can not used for read in hadoop 0.20.2.
- //writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
+ // @mark: this method can not used for read in hadoop 0.20.2.
+ // writer.appendRaw(kmer, keyStart, keyLength, reEnterCount);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
+
@Override
public void open(DataOutput output) throws HyracksDataException {
try {
@@ -70,10 +72,10 @@
throw new HyracksDataException(e);
}
}
+
@Override
public void close(DataOutput output) throws HyracksDataException {
// TODO Auto-generated method stub
-
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
index b93e723..0975fd2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/KMerTextWriterFactory.java
@@ -51,7 +51,6 @@
@Override
public void close(DataOutput output) throws HyracksDataException {
// TODO Auto-generated method stub
-
}
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
index f8decc3..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/PrinterOperatorDescriptor.java
@@ -1,151 +0,0 @@
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.BufferedWriter;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-
-public class PrinterOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
- private String filename;
- private boolean writeFile;
- private BufferedWriter twriter;
- private FileOutputStream stream;
-
- /**
- * The constructor of HDFSWriteOperatorDescriptor.
- *
- * @param spec
- * the JobSpecification object
- * @param conf
- * the Hadoop JobConf which contains the output path
- * @param tupleWriterFactory
- * the ITupleWriterFactory implementation object
- * @throws HyracksException
- */
- public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
- super(spec, 1, 0);
- writeFile = false;
- }
-
- public PrinterOperatorDescriptor(IOperatorDescriptorRegistry spec,
- String filename) {
- super(spec, 1, 0);
- this.filename = filename;
- writeFile = true;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(
- final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider,
- final int partition, final int nPartitions)
- throws HyracksDataException {
-
- return new AbstractUnaryInputSinkOperatorNodePushable() {
- private RecordDescriptor inputRd = recordDescProvider
- .getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(
- ctx.getFrameSize(), inputRd);
- private FrameTupleReference tuple = new FrameTupleReference();
-
- @Override
- public void open() throws HyracksDataException {
- if (true == writeFile) {
- try {
- filename = filename + String.valueOf(partition)
- + ".txt";
- // System.err.println(filename);
- stream = new FileOutputStream(filename);
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- twriter = new BufferedWriter(new OutputStreamWriter(stream));
- }
- }
-
- private void PrintBytes(int no) {
- try {
-
- byte[] bytes = tuple.getFieldData(no);
- int offset = tuple.getFieldStart(no);
- int length = tuple.getFieldLength(no);
- if (true == writeFile) {
- for (int j = offset; j < offset + length; j++) {
- twriter.write(String.valueOf((int) bytes[j]));
- twriter.write(" ");
- }
- twriter.write("&&");
- } else {
- for (int j = offset; j < offset + length; j++) {
- System.err.print(String.valueOf((int) bytes[j]));
- System.err.print(" ");
- }
- System.err.print("&&");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer)
- throws HyracksDataException {
- try {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- int tj = tuple.getFieldCount();
- for (int j = 0; j < tj; j++) {
- PrintBytes(j);
- }
- if (true == writeFile) {
- twriter.write("\r\n");
- } else {
- System.err.println("");
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
-
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (true == writeFile) {
- try {
- twriter.close();
- stream.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
-
- };
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 6256f86..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,422 +0,0 @@
-package edu.uci.ics.genomix.dataflow;
-
-import java.io.File;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;
-import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
-import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
-import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
-import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;
-import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-
-public class Tester {
-
- private static final Logger LOGGER = Logger.getLogger(Tester.class
- .getName());
- public static final String NC1_ID = "nc1";
- public static final String NC2_ID = "nc2";
- public static final String NC3_ID = "nc3";
- public static final String NC4_ID = "nc4";
-
- private static ClusterControllerService cc;
- private static NodeControllerService nc1;
- private static NodeControllerService nc2;
- private static NodeControllerService nc3;
- private static NodeControllerService nc4;
- private static IHyracksClientConnection hcc;
-
- // private static final boolean DEBUG = true;
-
- public static void main(String[] args) throws Exception {
-
- try {
- LOGGER.setLevel(Level.OFF);
-
- init();
-
- // Options options = new Options();
-
- IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1",
- 39000);
-
- /*
- * JobSpecification job =
- * createJob(parseFileSplits(options.inFileCustomerSplits),
- * parseFileSplits(options.inFileOrderSplits),
- * parseFileSplits(options.outFileSplits),
- * options.numJoinPartitions, options.algo, options.graceInputSize,
- * options.graceRecordsPerFrame, options.graceFactor,
- * options.memSize, options.tableSize, options.hasGroupBy);
- */
-
- int k, page_num;
- String file_name = args[0];
- k = Integer.parseInt(args[1]);
- page_num = Integer.parseInt(args[2]);
- int type = Integer.parseInt(args[3]);
-
- JobSpecification job = createJob(file_name, k, page_num, type);
-
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("test", job);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- } finally {
- }
- /*
- *
- * String s = "g:\\data\\results.txt" ;
- *
- * filenames = new FileOutputStream(s); // filenames = new
- * FileInputStream("filename.txt");
- *
- * BufferedWriter writer = new BufferedWriter(new
- * OutputStreamWriter(filenames)); writer.write((int) (end-start));
- * writer.close();
- */
-
- }
-
- public static void init() throws Exception {
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = -1;
- File outDir = new File("target/ClusterController");
- outDir.mkdirs();
- File ccRoot = File.createTempFile(Tester.class.getName(), ".data",
- outDir);
- ccRoot.delete();
- ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
- cc = new ClusterControllerService(ccConfig);
- cc.start();
- ccConfig.defaultMaxJobAttempts = 0;
-
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- nc1 = new NodeControllerService(ncConfig1);
- nc1.start();
-
-// NCConfig ncConfig2 = new NCConfig();
-// ncConfig2.ccHost = "localhost";
-// ncConfig2.ccPort = 39001;
-// ncConfig2.clusterNetIPAddress = "127.0.0.1";
-// ncConfig2.dataIPAddress = "127.0.0.1";
-// ncConfig2.nodeId = NC2_ID;
-// nc2 = new NodeControllerService(ncConfig2);
-// nc2.start();
-// NCConfig ncConfig3 = new NCConfig();
-// ncConfig3.ccHost = "localhost";
-// ncConfig3.ccPort = 39001;
-// ncConfig3.clusterNetIPAddress = "127.0.0.1";
-// ncConfig3.dataIPAddress = "127.0.0.1";
-// ncConfig3.nodeId = NC3_ID;
-// nc3 = new NodeControllerService(ncConfig3);
-// nc3.start();
-// NCConfig ncConfig4 = new NCConfig();
-// ncConfig4.ccHost = "localhost";
-// ncConfig4.ccPort = 39001;
-// ncConfig4.clusterNetIPAddress = "127.0.0.1";
-// ncConfig4.dataIPAddress = "127.0.0.1";
-// ncConfig4.nodeId = NC4_ID;
-// nc4 = new NodeControllerService(ncConfig4);
-// nc4.start();
-
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
- ccConfig.clientNetPort);
- hcc.createApplication("test", null);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
- }
- }
-
- private static JobSpecification createJob(String filename, int k,
- int page_num, int type) throws HyracksDataException {
- JobSpecification spec = new JobSpecification();
-
- // spec.setFrameSize(32768);
- spec.setFrameSize(32768);
-
- FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
-// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
-// NC1_ID, NC2_ID, NC3_ID, NC4_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
- NC1_ID);
-
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] { null,
- ByteSerializerDeserializer.INSTANCE,
- ByteSerializerDeserializer.INSTANCE });
- // Integer64SerializerDeserializer.INSTANCE,
- // ByteSerializerDeserializer.INSTANCE,
- // ByteSerializerDeserializer.INSTANCE });
-
- int[] keyFields = new int[] { 0 };
- int frameLimits = 4096; // hyracks oriented
- int tableSize = 10485767; // hyracks oriented
-
- AbstractOperatorDescriptor single_grouper;
- IConnectorDescriptor conn_partition;
- AbstractOperatorDescriptor cross_grouper;
-
- if (0 == type) {// external group by
- single_grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
-
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }),
- tableSize), true);
-
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- cross_grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
-
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }),
- tableSize), true);
- } else if (1 == type) {
- single_grouper = new ExternalGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new VLongNormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- new DistributedMergeLmerAggregateFactory(),
- // new IntSumFieldAggregatorFactory(1, false) }),
- outputRec,
- new HashSpillableTableFactory(
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }),
- tableSize), true);
- conn_partition = new MToNPartitioningMergingConnectorDescriptor(
- spec,
- new KmerHashPartitioncomputerFactory(),
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) });
- cross_grouper = new PreclusteredGroupOperatorDescriptor(
- spec,
- keyFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new DistributedMergeLmerAggregateFactory(), outputRec);
- } else {
- long inputSizeInRawRecords = 154000000;
- long inputSizeInUniqueKeys = 38500000;
- int recordSizeInBytes = 4;
- int hashfuncStartLevel = 1;
- single_grouper = new HybridHashGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- inputSizeInRawRecords,
- inputSizeInUniqueKeys,
- recordSizeInBytes,
- tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
- // new IBinaryHashFunctionFamily[]
- // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new MergeKmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(), outputRec, true);
- conn_partition = new MToNPartitioningConnectorDescriptor(spec,
- new KmerHashPartitioncomputerFactory());
- recordSizeInBytes = 13;
- cross_grouper = new HybridHashGroupOperatorDescriptor(
- spec,
- keyFields,
- frameLimits,
- inputSizeInRawRecords,
- inputSizeInUniqueKeys,
- recordSizeInBytes,
- tableSize,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
- new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
- // new IBinaryHashFunctionFamily[]
- // {MurmurHash3BinaryHashFunctionFamily.INSTANCE},
- hashfuncStartLevel,
- new VLongNormalizedKeyComputerFactory(),
- new DistributedMergeLmerAggregateFactory(),
- new DistributedMergeLmerAggregateFactory(), outputRec, true);
- }
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- single_grouper, NC1_ID);
-// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-// single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
-
- IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
- spec);
- spec.connect(readfileConn, scan, 0, single_grouper, 0);
-
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- cross_grouper, NC1_ID);
-// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-// cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
- spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
-
- // PrinterOperatorDescriptor printer = new
- // PrinterOperatorDescriptor(spec);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,
- "result");
-// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-// NC1_ID, NC2_ID, NC3_ID, NC4_ID);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- printer,
- NC1_ID);
-
- IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(printConn, cross_grouper, 0, printer, 0);
- // spec.connect(readfileConn, scan, 0, printer, 0);
-
- spec.addRoot(printer);
-
- if (1 == type) {
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
- }
- // System.out.println(spec.toString());
- return spec;
- }
-
- static class JoinComparatorFactory implements ITuplePairComparatorFactory {
- private static final long serialVersionUID = 1L;
-
- private final IBinaryComparatorFactory bFactory;
- private final int pos0;
- private final int pos1;
-
- public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
- int pos0, int pos1) {
- this.bFactory = bFactory;
- this.pos0 = pos0;
- this.pos1 = pos1;
- }
-
- @Override
- public ITuplePairComparator createTuplePairComparator(
- IHyracksTaskContext ctx) {
- return new JoinComparator(bFactory.createBinaryComparator(), pos0,
- pos1);
- }
- }
-
- static class JoinComparator implements ITuplePairComparator {
-
- private final IBinaryComparator bComparator;
- private final int field0;
- private final int field1;
-
- public JoinComparator(IBinaryComparator bComparator, int field0,
- int field1) {
- this.bComparator = bComparator;
- this.field0 = field0;
- this.field1 = field1;
- }
-
- @Override
- public int compare(IFrameTupleAccessor accessor0, int tIndex0,
- IFrameTupleAccessor accessor1, int tIndex1) {
- int tStart0 = accessor0.getTupleStartOffset(tIndex0);
- int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
- int tStart1 = accessor1.getTupleStartOffset(tIndex1);
- int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
- int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
- int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
- int fLen0 = fEnd0 - fStart0;
-
- int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
- int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
- int fLen1 = fEnd1 - fStart1;
-
- int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
- + fStartOffset0, fLen0, accessor1.getBuffer().array(),
- fStart1 + fStartOffset1, fLen1);
- if (c != 0) {
- return c;
- }
- return 0;
- }
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
index e0bd786..27066a2 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/driver/Driver.java
@@ -12,8 +12,6 @@
import edu.uci.ics.genomix.job.GenomixJob;
import edu.uci.ics.genomix.job.JobGen;
import edu.uci.ics.genomix.job.JobGenBrujinGraph;
-import edu.uci.ics.genomix.job.JobGenContigsGeneration;
-import edu.uci.ics.genomix.job.JobGenGraphCleanning;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -84,12 +82,6 @@
jobGen = new JobGenBrujinGraph(job, scheduler, ncMap,
numPartitionPerMachine);
break;
- case GRAPH_CLEANNING:
- jobGen = new JobGenGraphCleanning(job);
- break;
- case CONTIGS_GENERATION:
- jobGen = new JobGenContigsGeneration(job);
- break;
}
start = System.currentTimeMillis();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
index fd8af39..7c4dcf0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenBrujinGraph.java
@@ -11,7 +11,7 @@
import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;
import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;
import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;
-import edu.uci.ics.genomix.data.std.primitive.VLongPointable;
+import edu.uci.ics.genomix.data.std.primitive.VLongKmerPointable;
import edu.uci.ics.genomix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.genomix.dataflow.KMerSequenceWriterFactory;
import edu.uci.ics.genomix.dataflow.KMerTextWriterFactory;
@@ -110,7 +110,7 @@
keyFields,
frameLimits,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
+ .of(VLongKmerPointable.FACTORY) },
new VLongNormalizedKeyComputerFactory(),
aggeragater,
new DistributedMergeLmerAggregateFactory(),
@@ -119,7 +119,7 @@
new FieldHashPartitionComputerFactory(
keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(VLongPointable.FACTORY) }),
+ .of(VLongKmerPointable.FACTORY) }),
tableSize), true);
}
@@ -137,7 +137,7 @@
recordSizeInBytes,
tableSize,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
+ .of(VLongKmerPointable.FACTORY) },
new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },
hashfuncStartLevel, new VLongNormalizedKeyComputerFactory(),
new MergeKmerAggregateFactory(),
@@ -166,12 +166,12 @@
new KmerHashPartitioncomputerFactory(),
keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) });
+ .of(VLongKmerPointable.FACTORY) });
crossGrouper = new PreclusteredGroupOperatorDescriptor(
jobSpec,
keyFields,
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(VLongPointable.FACTORY) },
+ .of(VLongKmerPointable.FACTORY) },
new DistributedMergeLmerAggregateFactory(),
combineOutputRec);
break;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
index 6d30fad..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenContigsGeneration.java
@@ -1,23 +0,0 @@
-package edu.uci.ics.genomix.job;
-
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class JobGenContigsGeneration extends JobGen {
-
- public JobGenContigsGeneration(GenomixJob job) {
- super(job);
- }
-
- @Override
- public JobSpecification generateJob() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
index 43b5a97..e69de29 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/job/JobGenGraphCleanning.java
@@ -1,23 +0,0 @@
-package edu.uci.ics.genomix.job;
-
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class JobGenGraphCleanning extends JobGen {
-
- public JobGenGraphCleanning(GenomixJob job) {
- super(job);
- }
-
- @Override
- public JobSpecification generateJob() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- protected void initJobConfiguration() {
- // TODO Auto-generated method stub
-
- }
-
-}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index f872ff9..12ca9c9 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -96,7 +96,7 @@
Path dest = new Path(HDFS_INPUT_PATH);
Path result = new Path(HDFS_OUTPUT_PATH);
dfs.mkdirs(dest);
- dfs.mkdirs(result);
+ //dfs.mkdirs(result);
dfs.copyFromLocalFile(src, dest);
DataOutputStream confOutput = new DataOutputStream(
@@ -119,9 +119,9 @@
@Test
public void TestExternalGroupby() throws Exception {
- cleanUpReEntry();
+ //cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "external");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
System.err.println("Testing ExternalGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
@@ -131,7 +131,7 @@
public void TestPreClusterGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "precluster");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
System.err.println("Testing PreClusterGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
@@ -141,7 +141,7 @@
public void TestHybridGroupby() throws Exception {
cleanUpReEntry();
conf.set(GenomixJob.GROUPBY_TYPE, "hybrid");
- conf.set(GenomixJob.OUTPUT_FORMAT, "text");
+ conf.set(GenomixJob.OUTPUT_FORMAT, "binary");
System.err.println("Testing HybridGroupBy");
driver.runJob(new GenomixJob(conf), Plan.BUILD_DEBRUJIN_GRAPH, true);
Assert.assertEquals(true, checkResults());
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..e82006c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -256,7 +256,7 @@
outputAppender.reset(outputFrame, true);
- writer.open();
+ //writer.open();
if (tPointers == null) {
// Not sorted
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 36d5f55..71ec2d1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -39,91 +39,102 @@
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
/**
- * The HDFS file write operator using the Hadoop old API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop old API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
*/
@SuppressWarnings("deprecation")
-public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class HDFSWriteOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- private ConfFactory confFactory;
- private ITupleWriterFactory tupleWriterFactory;
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private ITupleWriterFactory tupleWriterFactory;
- /**
- * The constructor of HDFSWriteOperatorDescriptor.
- *
- * @param spec
- * the JobSpecification object
- * @param conf
- * the Hadoop JobConf which contains the output path
- * @param tupleWriterFactory
- * the ITupleWriterFactory implementation object
- * @throws HyracksException
- */
- public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf, ITupleWriterFactory tupleWriterFactory)
- throws HyracksException {
- super(spec, 1, 0);
- this.confFactory = new ConfFactory(conf);
- this.tupleWriterFactory = tupleWriterFactory;
- }
+ /**
+ * The constructor of HDFSWriteOperatorDescriptor.
+ *
+ * @param spec
+ * the JobSpecification object
+ * @param conf
+ * the Hadoop JobConf which contains the output path
+ * @param tupleWriterFactory
+ * the ITupleWriterFactory implementation object
+ * @throws HyracksException
+ */
+ public HDFSWriteOperatorDescriptor(JobSpecification spec, JobConf conf,
+ ITupleWriterFactory tupleWriterFactory) throws HyracksException {
+ super(spec, 1, 0);
+ this.confFactory = new ConfFactory(conf);
+ this.tupleWriterFactory = tupleWriterFactory;
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider,
+ final int partition, final int nPartitions)
+ throws HyracksDataException {
- return new AbstractUnaryInputSinkOperatorNodePushable() {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
- private FSDataOutputStream dos;
- private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
- private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
- private FrameTupleReference tuple = new FrameTupleReference();
- private ITupleWriter tupleWriter;
- private ClassLoader ctxCL;
+ private FSDataOutputStream dos;
+ private RecordDescriptor inputRd = recordDescProvider
+ .getInputRecordDescriptor(getActivityId(), 0);;
+ private FrameTupleAccessor accessor = new FrameTupleAccessor(
+ ctx.getFrameSize(), inputRd);
+ private FrameTupleReference tuple = new FrameTupleReference();
+ private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
- @Override
- public void open() throws HyracksDataException {
- ctxCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- JobConf conf = confFactory.getConf();
- String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
- String fileName = outputDirPath + File.separator + "part-" + partition;
+ @Override
+ public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(
+ this.getClass().getClassLoader());
+ JobConf conf = confFactory.getConf();
+ String outputDirPath = FileOutputFormat.getOutputPath(conf)
+ .toString();
+ String fileName = outputDirPath + File.separator + "part-"
+ + partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
- try {
- FileSystem dfs = FileSystem.get(conf);
- dos = dfs.create(new Path(fileName), true);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ dos = dfs.create(new Path(fileName), true);
+ tupleWriter.open(dos);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- tupleWriter.write(dos, tuple);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ tuple.reset(accessor, i);
+ tupleWriter.write(dos, tuple);
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
+ @Override
+ public void fail() throws HyracksDataException {
- }
+ }
- @Override
- public void close() throws HyracksDataException {
- try {
- dos.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ tupleWriter.close(dos);
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
- };
- }
+ };
+ }
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 86ee527..d292673 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -92,6 +92,7 @@
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
dos = dfs.create(new Path(fileName), true);
+ tupleWriter.open(dos);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -115,6 +116,7 @@
@Override
public void close() throws HyracksDataException {
try {
+ tupleWriter.close(dos);
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);