finish mapKmerToReadID
diff --git a/genomix/genomix-data/.classpath b/genomix/genomix-data/.classpath
index 6e26406..e43402f 100644
--- a/genomix/genomix-data/.classpath
+++ b/genomix/genomix-data/.classpath
@@ -1,10 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"/>
- <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
- <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"/>
- <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="src" output="target/classes" path="src/main/java">
+ <attributes>
+ <attribute name="optional" value="true"/>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="src" output="target/test-classes" path="src/test/java">
+ <attributes>
+ <attribute name="optional" value="true"/>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
+ <attributes>
+ <attribute name="maven.pomderived" value="true"/>
+ </attributes>
+ </classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
new file mode 100644
index 0000000..0dc24a5
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/Position.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class Position implements Writable {
+ public int readID;
+ public byte posInRead;
+
+ public Position() {
+ readID = 0;
+ posInRead = 0;
+ }
+
+ public Position(int readID, byte posInRead) {
+ this.readID = readID;
+ this.posInRead = posInRead;
+ }
+
+ public Position(final Position pos) {
+ this.readID = pos.readID;
+ this.posInRead = pos.posInRead;
+ }
+
+ public void set(int readID, byte posInRead) {
+ this.readID = readID;
+ this.posInRead = posInRead;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ readID = in.readInt();
+ posInRead = in.readByte();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(readID);
+ out.writeByte(posInRead);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/pom.xml b/genomix/genomix-hyracks/pom.xml
index ca5cb61..ebc49f2 100644
--- a/genomix/genomix-hyracks/pom.xml
+++ b/genomix/genomix-hyracks/pom.xml
@@ -261,5 +261,6 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+
</dependencies>
</project>
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
new file mode 100644
index 0000000..3826f9b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ByteSerializerDeserializer.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ByteSerializerDeserializer implements ISerializerDeserializer<Byte> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final ByteSerializerDeserializer INSTANCE = new ByteSerializerDeserializer();
+
+ private ByteSerializerDeserializer() {
+ }
+
+ @Override
+ public Byte deserialize(DataInput in) throws HyracksDataException {
+ try {
+ return in.readByte();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void serialize(Byte instance, DataOutput out) throws HyracksDataException {
+ try {
+ out.writeByte(instance.intValue());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static byte getByte(byte[] bytes, int offset) {
+ return bytes[offset];
+ }
+
+ public static void putByte(byte val, byte[] bytes, int offset) {
+ bytes[offset] = val;
+ }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..a4cfd3b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerBinaryHashFunctionFamily.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public class KmerBinaryHashFunctionFamily implements IBinaryHashFunctionFamily {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction(final int seed) {
+
+ return new IBinaryHashFunction() {
+ private KmerPointable p = new KmerPointable();
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ if (length + offset >= bytes.length)
+ throw new IllegalStateException("out of bound");
+ p.set(bytes, offset, length);
+ int hash = p.hash() * (seed + 1);
+ if (hash < 0) {
+ hash = -(hash + 1);
+ }
+ return hash;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
new file mode 100644
index 0000000..eb0d6bb
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerHashPartitioncomputerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+
+public class KmerHashPartitioncomputerFactory implements ITuplePartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static int hashBytes(byte[] bytes, int offset, int length) {
+ int hash = 1;
+ for (int i = offset; i < offset + length; i++)
+ hash = (31 * hash) + (int) bytes[i];
+ return hash;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+ int slotLength = accessor.getFieldSlotsLength();
+ int fieldLength = accessor.getFieldLength(tIndex, 0);
+
+ ByteBuffer buf = accessor.getBuffer();
+
+ int hash = hashBytes(buf.array(), startOffset + fieldOffset + slotLength, fieldLength);
+ if (hash < 0) {
+ hash = -(hash + 1);
+ }
+
+ return hash % nParts;
+ }
+ };
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
new file mode 100644
index 0000000..44b0e10
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/KmerNormarlizedComputerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class KmerNormarlizedComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ return new INormalizedKeyComputer() {
+ /**
+ * read one int from Kmer, make sure this int is consistent whith Kmer compartor
+ */
+ @Override
+ public int normalize(byte[] bytes, int start, int length) {
+ return KmerPointable.getIntReverse(bytes, start, length);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
new file mode 100644
index 0000000..d328bd1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/accessors/ReadIDPartitionComputerFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.genomix.hyracks.data.accessors;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class ReadIDPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ // TODO Auto-generated method stub
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldOffset = accessor.getFieldStartOffset(tIndex, 0);
+ int slotLength = accessor.getFieldSlotsLength();
+
+ ByteBuffer buf = accessor.getBuffer();
+
+ int hash = IntegerSerializerDeserializer.getInt(buf.array(), startOffset + fieldOffset + slotLength);
+ if (hash < 0) {
+ hash = -(hash + 1);
+ }
+
+ return hash % nParts;
+ }
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
new file mode 100644
index 0000000..4ceda78
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/KmerPointable.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+public final class KmerPointable extends AbstractPointable implements IHashable, IComparable, INumeric {
+ public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isFixedLength() {
+ return false;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return -1;
+ }
+ };
+
+ public static final IPointableFactory FACTORY = new IPointableFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IPointable createPointable() {
+ return new KmerPointable();
+ }
+
+ @Override
+ public ITypeTraits getTypeTraits() {
+ return TYPE_TRAITS;
+ }
+ };
+
+ public static short getShortReverse(byte[] bytes, int offset, int length) {
+ if (length < 2) {
+ return (short) (bytes[offset] & 0xff);
+ }
+ return (short) (((bytes[offset + length - 1] & 0xff) << 8) + (bytes[offset + length - 2] & 0xff));
+ }
+
+ public static int getIntReverse(byte[] bytes, int offset, int length) {
+ int shortValue = getShortReverse(bytes, offset, length) & 0xffff;
+
+ if (length < 3) {
+ return shortValue;
+ }
+ if (length == 3) {
+ return (((bytes[offset + 2] & 0xff) << 16) + ((bytes[offset + 1] & 0xff) << 8) + ((bytes[offset] & 0xff)));
+ }
+ return ((bytes[offset + length - 1] & 0xff) << 24) + ((bytes[offset + length - 2] & 0xff) << 16)
+ + ((bytes[offset + length - 3] & 0xff) << 8) + ((bytes[offset + length - 4] & 0xff) << 0);
+ }
+
+ public static long getLongReverse(byte[] bytes, int offset, int length) {
+ if (length < 8) {
+ return ((long) getIntReverse(bytes, offset, length)) & 0x0ffffffffL;
+ }
+ return (((long) (bytes[offset + length - 1] & 0xff)) << 56)
+ + (((long) (bytes[offset + length - 2] & 0xff)) << 48)
+ + (((long) (bytes[offset + length - 3] & 0xff)) << 40)
+ + (((long) (bytes[offset + length - 4] & 0xff)) << 32)
+ + (((long) (bytes[offset + length - 5] & 0xff)) << 24)
+ + (((long) (bytes[offset + length - 6] & 0xff)) << 16)
+ + (((long) (bytes[offset + length - 7] & 0xff)) << 8) + (((long) (bytes[offset + length - 8] & 0xff)));
+ }
+
+ @Override
+ public int compareTo(IPointable pointer) {
+ return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+ }
+
+ @Override
+ public int compareTo(byte[] bytes, int offset, int length) {
+
+ if (this.length != length) {
+ return this.length - length;
+ }
+ for (int i = length - 1; i >= 0; i--) {
+ int cmp = (this.bytes[this.start + i] & 0xff) - (bytes[offset + i] & 0xff);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public int hash() {
+ int hash = KmerHashPartitioncomputerFactory.hashBytes(bytes, start, length);
+ return hash;
+ }
+
+ @Override
+ public byte byteValue() {
+ return bytes[start + length - 1];
+ }
+
+ @Override
+ public short shortValue() {
+ return getShortReverse(bytes, start, length);
+ }
+
+ @Override
+ public int intValue() {
+ return getIntReverse(bytes, start, length);
+ }
+
+ @Override
+ public long longValue() {
+ return getLongReverse(bytes, start, length);
+ }
+
+ @Override
+ public float floatValue() {
+ return Float.intBitsToFloat(intValue());
+ }
+
+ @Override
+ public double doubleValue() {
+ return Double.longBitsToDouble(longValue());
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
new file mode 100644
index 0000000..0fa1489
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionListReference.java
@@ -0,0 +1,114 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class PositionListReference implements Writable, IValueReference {
+ private byte[] values;
+ private int valueCount;
+ private static final byte[] EMPTY = {};
+
+ private PositionReference posIter = new PositionReference();
+
+ public PositionListReference() {
+ this.values = EMPTY;
+ this.valueCount = 0;
+ }
+
+ protected void setSize(int size) {
+ if (size > getCapacity()) {
+ setCapacity((size * 3 / 2));
+ }
+ }
+
+ protected int getCapacity() {
+ return values.length;
+ }
+
+ protected void setCapacity(int new_cap) {
+ if (new_cap > getCapacity()) {
+ byte[] new_data = new byte[new_cap];
+ if (values.length > 0) {
+ System.arraycopy(values, 0, new_data, 0, values.length);
+ }
+ values = new_data;
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.valueCount = in.readInt();
+ setSize(valueCount * PositionReference.LENGTH);
+ in.readFully(values, 0, valueCount * PositionReference.LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(valueCount);
+ out.write(values, 0, valueCount * PositionReference.LENGTH);
+ }
+
+ public PositionReference getPosition(int i) {
+ if (i >= valueCount) {
+ throw new ArrayIndexOutOfBoundsException("Not so much positions");
+ }
+ posIter.setNewSpace(values, i * PositionReference.LENGTH);
+ return posIter;
+ }
+
+ public void set(PositionListReference list2) {
+ set(list2.valueCount, list2.values, 0);
+ }
+
+ public void set(int valueCount, byte[] newData, int offset) {
+ this.valueCount = valueCount;
+ setSize(valueCount * PositionReference.LENGTH);
+ if (valueCount > 0) {
+ System.arraycopy(newData, offset, values, 0, valueCount * PositionReference.LENGTH);
+ }
+ }
+
+ public void reset() {
+ valueCount = 0;
+ }
+
+ public void append(PositionReference pos) {
+ setSize((1 + valueCount) * PositionReference.LENGTH);
+ System.arraycopy(pos.getByteArray(), pos.getStartOffset(), values, valueCount * PositionReference.LENGTH,
+ pos.getLength());
+ valueCount += 1;
+ }
+
+ public void append(int readID, byte posInRead) {
+ setSize((1 + valueCount) * PositionReference.LENGTH);
+ IntegerSerializerDeserializer.putInt(readID, values, valueCount * PositionReference.LENGTH);
+ values[valueCount * PositionReference.LENGTH + PositionReference.INTBYTES] = posInRead;
+ valueCount += 1;
+ }
+
+ public int getPositionCount() {
+ return valueCount;
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return values;
+ }
+
+ @Override
+ public int getStartOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getLength() {
+ return valueCount * PositionReference.LENGTH;
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
new file mode 100644
index 0000000..b3feaa5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/data/primitive/PositionReference.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hyracks.data.primitive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class PositionReference implements IValueReference,Writable {
+ private byte[] storage;
+ private int offset;
+ public static final int LENGTH = 5;
+ public static final int INTBYTES = 4;
+
+ public PositionReference() {
+ storage = new byte[LENGTH];
+ offset = 0;
+ }
+
+ public PositionReference(byte[] storage, int offset) {
+ setNewSpace(storage, offset);
+ }
+
+ public PositionReference(int readID, byte posInRead) {
+ this();
+ IntegerSerializerDeserializer.putInt(readID, storage, offset);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ public void set(int readID, byte posInRead) {
+ IntegerSerializerDeserializer.putInt(readID, storage, offset);
+ storage[offset + INTBYTES] = posInRead;
+ }
+
+ public void setNewSpace(byte[] storage, int offset) {
+ this.storage = storage;
+ this.offset = offset;
+ }
+
+ public int getReadID() {
+ return IntegerSerializerDeserializer.getInt(storage, offset);
+ }
+
+ public byte getPosInRead() {
+ return storage[offset + INTBYTES];
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return storage;
+ }
+
+ @Override
+ public int getStartOffset() {
+ return offset;
+ }
+
+ @Override
+ public int getLength() {
+ return LENGTH;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ in.readFully(storage, offset, LENGTH);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(storage, offset, LENGTH);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..fe44f51
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+/**
+ * used by precluster groupby
+ */
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+ private static final long serialVersionUID = 1L;
+ private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+ @Override
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts) {
+ if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+ return senderSideMaterializePolicy;
+ } else {
+ return pipeliningPolicy;
+ }
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
new file mode 100644
index 0000000..1bd427c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerSequenceWriterFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class KMerSequenceWriterFactory implements ITupleWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ private ConfFactory confFactory;
+ private final int kmerlength;
+
+ public KMerSequenceWriterFactory(JobConf conf) throws HyracksDataException {
+ this.confFactory = new ConfFactory(conf);
+ this.kmerlength = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ }
+
+ public class TupleWriter implements ITupleWriter {
+ public TupleWriter(ConfFactory cf) {
+ this.cf = cf;
+ }
+
+ ConfFactory cf;
+ Writer writer = null;
+
+ KmerCountValue reEnterCount = new KmerCountValue();
+ KmerBytesWritable reEnterKey = new KmerBytesWritable(kmerlength);
+
+ /**
+ * assumption is that output never change source!
+ */
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ byte[] kmer = tuple.getFieldData(0);
+ int keyStart = tuple.getFieldStart(0);
+ int keyLength = tuple.getFieldLength(0);
+
+ byte bitmap = tuple.getFieldData(1)[tuple.getFieldStart(1)];
+ byte count = tuple.getFieldData(2)[tuple.getFieldStart(2)];
+ reEnterCount.set(bitmap, count);
+ reEnterKey.set(kmer, keyStart, keyLength);
+ writer.append(reEnterKey, reEnterCount);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ try {
+ writer = SequenceFile.createWriter(cf.getConf(), (FSDataOutputStream) output, KmerBytesWritable.class,
+ KmerCountValue.class, CompressionType.NONE, null);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ }
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new TupleWriter(confFactory);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
new file mode 100644
index 0000000..54e84f7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/KMerTextWriterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+
+public class KMerTextWriterFactory implements ITupleWriterFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private KmerBytesWritable kmer;
+
+ public KMerTextWriterFactory(int k) {
+ kmer = new KmerBytesWritable(k);
+ }
+
+ public class TupleWriter implements ITupleWriter {
+ @Override
+ public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
+ try {
+ kmer.set(tuple.getFieldData(0), tuple.getFieldStart(0), tuple.getFieldLength(0));
+ output.write(kmer.toString().getBytes());
+ output.writeByte('\t');
+ output.write(GeneCode.getSymbolFromBitMap(tuple.getFieldData(1)[tuple.getFieldStart(1)]).getBytes());
+ output.writeByte('\t');
+ output.write(String.valueOf((int) tuple.getFieldData(2)[tuple.getFieldStart(2)]).getBytes());
+ output.writeByte('\n');
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void open(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close(DataOutput output) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ }
+ }
+
+ @Override
+ public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new TupleWriter();
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
new file mode 100644
index 0000000..924f455
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapKmerPositionToReadOperator.java
@@ -0,0 +1,176 @@
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class MapKmerPositionToReadOperator extends AbstractSingleActivityOperatorDescriptor {
+
+ public MapKmerPositionToReadOperator(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recDesc;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,*Kmer*,{OtherReadID,...})
+ * OtherReadID appears only when otherReadID.otherPos==0
+ */
+ public class MapKmerPositionToReadNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
+ private final RecordDescriptor inputRecDesc;
+ private final RecordDescriptor outputRecDesc;
+
+ private FrameTupleAccessor accessor;
+ private ByteBuffer writeBuffer;
+ private ArrayTupleBuilder builder;
+ private FrameTupleAppender appender;
+
+ public static final int InputKmerField = 0;
+ public static final int InputPosListField = 1;
+
+ public static final int OutputReadIDField = 0;
+ public static final int OutputPosInReadField = 1;
+ public static final int OutputOtherReadIDListField = 2;
+ public static final int OutputKmerField = 3; // may not needed
+
+ private PositionReference positionEntry;
+ private ArrayBackedValueStorage posListEntry;
+ private ArrayBackedValueStorage zeroPositionCollection;
+ private ArrayBackedValueStorage noneZeroPositionCollection;
+
+ public MapKmerPositionToReadNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
+ RecordDescriptor outputRecDesc) {
+ this.ctx = ctx;
+ this.inputRecDesc = inputRecDesc;
+ this.outputRecDesc = outputRecDesc;
+ this.positionEntry = new PositionReference();
+ this.posListEntry = new ArrayBackedValueStorage();
+ this.zeroPositionCollection = new ArrayBackedValueStorage();
+ this.noneZeroPositionCollection = new ArrayBackedValueStorage();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
+ builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(writeBuffer, true);
+ writer.open();
+ posListEntry.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ scanPosition(i, zeroPositionCollection, noneZeroPositionCollection);
+ writeTuple(i, zeroPositionCollection, noneZeroPositionCollection, builder);
+ }
+ }
+
+ private void scanPosition(int tIndex, ArrayBackedValueStorage zeroPositionCollection2,
+ ArrayBackedValueStorage noneZeroPositionCollection2) {
+ zeroPositionCollection2.reset();
+ noneZeroPositionCollection2.reset();
+ byte[] data = accessor.getBuffer().array();
+ //Kmer, {(ReadID,PosInRead),...}
+ // to ReadID, PosInRead, Kmer, {OtherReadID}
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewSpace(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() == 0) {
+ zeroPositionCollection2.append(positionEntry);
+ } else {
+ noneZeroPositionCollection2.append(positionEntry);
+ }
+ }
+
+ }
+
+ private void writeTuple(int tIndex, ArrayBackedValueStorage zeroPositionCollection,
+ ArrayBackedValueStorage noneZeroPositionCollection, ArrayTupleBuilder builder2) {
+ byte[] data = accessor.getBuffer().array();
+ int offsetPoslist = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputPosListField);
+ for (int i = 0; i < accessor.getFieldLength(tIndex, InputPosListField); i += PositionReference.LENGTH) {
+ positionEntry.setNewSpace(data, offsetPoslist + i);
+ if (positionEntry.getPosInRead() != 0) {
+ appendNodeToBuilder(tIndex, positionEntry, zeroPositionCollection, builder2);
+ } else {
+ appendNodeToBuilder(tIndex, positionEntry, noneZeroPositionCollection, builder2);
+ }
+ }
+ }
+
+ private void appendNodeToBuilder(int tIndex, PositionReference pos, ArrayBackedValueStorage posList2,
+ ArrayTupleBuilder builder2) {
+ try {
+ builder2.addField(pos.getByteArray(), 0, PositionReference.INTBYTES);
+ builder2.addField(pos.getByteArray(), PositionReference.INTBYTES, 1);
+ //? ask Yingyi, if support empty bytes[]
+ if (posList2 == null) {
+ builder2.addFieldEndOffset();
+ } else {
+ builder2.addField(posList2.getByteArray(), posList2.getStartOffset(), posList2.getLength());
+ }
+ // set kmer, may not useful
+ byte[] data = accessor.getBuffer().array();
+ int offsetKmer = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(tIndex, InputKmerField);
+ builder2.addField(data, offsetKmer, accessor.getFieldLength(tIndex, InputKmerField));
+
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
+ if (!appender.append(builder2.getFieldEndOffsets(), builder2.getByteArray(), 0, builder2.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ builder2.reset();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(
+ "Failed to Add a field to the tuple by copying the data bytes from a byte array.");
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
+ }
+
+ }
+
+ @Override
+ public AbstractOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new MapKmerPositionToReadNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(getActivityId(),
+ 0), recordDescriptors[0]);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..1bc2137
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/ReadsKeyValueParserFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+ private KmerBytesWritable kmer;
+ private boolean bReversed;
+
+ public ReadsKeyValueParserFactory(int k, boolean bGenerateReversed) {
+ bReversed = bGenerateReversed;
+ kmer = new KmerBytesWritable(k);
+ }
+
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+ return new IKeyValueParser<LongWritable, Text>() {
+
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+ if (geneLine.length != 2) {
+ return;
+ }
+ int readID = 0;
+ try {
+ readID = Integer.parseInt(geneLine[0]);
+ } catch (NumberFormatException e) {
+ LOG.warn("Invalid data");
+ return;
+ }
+
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine[1]);
+ boolean isValid = geneMatcher.matches();
+ if (isValid) {
+ SplitReads(readID, geneLine[1].getBytes(), writer);
+ }
+ }
+
+ private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
+ /** first kmer */
+ int k = kmer.getKmerLength();
+ if (k >= array.length) {
+ return;
+ }
+ kmer.setByRead(array, 0);
+ InsertToFrame(kmer, readID, 0, writer);
+
+ /** middle kmer */
+ for (int i = k; i < array.length; i++) {
+ kmer.shiftKmerWithNextChar(array[i]);
+ InsertToFrame(kmer, readID, i - k + 1, writer);
+ }
+
+ if (bReversed) {
+ /** first kmer */
+ kmer.setByReadReverse(array, 0);
+ InsertToFrame(kmer, -readID, array.length - k, writer);
+ /** middle kmer */
+ for (int i = k; i < array.length; i++) {
+ kmer.shiftKmerWithPreChar(array[i]);
+ InsertToFrame(kmer, -readID, array.length - i - 1, writer);
+ }
+ }
+ }
+
+ private void InsertToFrame(KmerBytesWritable kmer, int readID, int posInRead, IFrameWriter writer) {
+ try {
+ if (posInRead > 127){
+ throw new IllegalArgumentException ("Position id is beyond 127 at " + readID);
+ }
+ tupleBuilder.reset();
+ tupleBuilder.addField(kmer.getBytes(), 0, kmer.getLength());
+ tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, readID);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, (byte)posInRead);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ };
+ }
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
new file mode 100644
index 0000000..2ed4c9b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggerateReadIDAggregateFactory.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggerateReadIDAggregateFactory implements IAggregatorDescriptorFactory{
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new IAggregatorDescriptor(){
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+ }
+
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..7dc4947
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+ private PositionReference position = new PositionReference();
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ protected byte readByteField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return ByteSerializerDeserializer.getByte(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
+
+ protected int readIntField(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ return IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(),
+ getOffSet(accessor, tIndex, fieldId));
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ inputVal.reset();
+ position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ inputVal.append(position);
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ position.set(readIntField(accessor, tIndex, 1), readByteField(accessor, tIndex, 2));
+ inputVal.append(position);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..ec2b47c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor (){
+
+ private PositionReference position = new PositionReference();
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ inputVal.reset();
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
+ position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage)state.state;
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for( int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex, 1); offset += PositionReference.LENGTH){
+ position.setNewSpace(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
new file mode 100644
index 0000000..b83aaf7
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/aggregators/MergeReadIDAggregateFactory.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.genomix.hyracks.dataflow.aggregators;
+
+public class MergeReadIDAggregateFactory {
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
new file mode 100644
index 0000000..d958205
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.driver;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+ public static enum Plan {
+ BUILD_DEBRUJIN_GRAPH,
+ GRAPH_CLEANNING,
+ CONTIGS_GENERATION,
+ }
+
+ private static final String IS_PROFILING = "genomix.driver.profiling";
+ private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
+
+ private int numPartitionPerMachine;
+
+ private IHyracksClientConnection hcc;
+ private Scheduler scheduler;
+
+ public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+ try {
+ hcc = new HyracksConnection(ipAddress, port);
+ scheduler = new Scheduler(hcc.getNodeControllerInfos());
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
+
+ public void runJob(GenomixJob job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
+
+ public void runJob(GenomixJob job, Plan planChoice, boolean profiling) throws HyracksException {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.addResource(hadoopHdfs);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+ LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:
+ default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ }
+
+ start = System.currentTimeMillis();
+ runCreate(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void main(String[] args) throws Exception {
+ GenomixJob jobConf = new GenomixJob();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
+
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
new file mode 100644
index 0000000..66f0d1e
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/GenomixJob.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+public class GenomixJob extends JobConf {
+
+ public static final String JOB_NAME = "genomix";
+
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmer";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+ /** Graph outputformat */
+ public static final String OUTPUT_FORMAT = "genomix.graph.output";
+ /** Get reversed Kmer Sequence */
+ public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+ public static final int DEFAULT_KMER = 21;
+ public static final int DEFAULT_FRAME_SIZE = 32768;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+ public static final boolean DEFAULT_REVERSED = false;
+
+ public static final String DEFAULT_GROUPBY_TYPE = "hybrid";
+ public static final String DEFAULT_OUTPUT_FORMAT = "binary";
+
+ public GenomixJob() throws IOException {
+ super(new Configuration());
+ }
+
+ public GenomixJob(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame size
+ */
+ final public void setKmerLength(int kmerlength) {
+ setInt(KMER_LENGTH, kmerlength);
+ }
+
+ final public void setFrameSize(int frameSize) {
+ setInt(FRAME_SIZE, frameSize);
+ }
+
+ final public void setFrameLimit(int frameLimit) {
+ setInt(FRAME_LIMIT, frameLimit);
+ }
+
+ final public void setTableSize(int tableSize) {
+ setInt(TABLE_SIZE, tableSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
new file mode 100644
index 0000000..3563f93
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGen.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class JobGen {
+
+ protected final Configuration conf;
+ protected final GenomixJob genomixJob;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+ public JobGen(GenomixJob job) {
+ this.conf = job;
+ this.genomixJob = job;
+ this.initJobConfiguration();
+ }
+
+ protected abstract void initJobConfiguration();
+
+ public abstract JobSpecification generateJob() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
new file mode 100644
index 0000000..6eb5bac
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerBinaryHashFunctionFamily;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenBrujinGraph extends JobGen {
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
+
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
+
+ JobConf job;
+ private static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ private Scheduler scheduler;
+ private String[] ncNodeNames;
+
+ private int kmers;
+ private int frameLimits;
+ private int frameSize;
+ private int tableSize;
+ private GroupbyType groupbyType;
+ private OutputFormat outputFormat;
+ private boolean bGenerateReversedKmer;
+
+ private AbstractOperatorDescriptor singleGrouper;
+ private IConnectorDescriptor connPartition;
+ private AbstractOperatorDescriptor crossGrouper;
+ private RecordDescriptor readOutputRec;
+ private RecordDescriptor combineOutputRec;
+
+ /** works for hybrid hashing */
+ private long inputSizeInRawRecords;
+ private long inputSizeInUniqueKeys;
+ private int recordSizeInBytes;
+ private int hashfuncStartLevel;
+
+ private void logDebug(String status) {
+ LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+ }
+
+ public JobGenBrujinGraph(GenomixJob job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) {
+ super(job);
+ this.scheduler = scheduler;
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ logDebug("initialize");
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(KmerPointable.FACTORY) }), tableSize), true);
+ }
+
+ private HybridHashGroupOperatorDescriptor newHybridGroupby(JobSpecification jobSpec, int[] keyFields,
+ long inputSizeInRawRecords, long inputSizeInUniqueKeys, int recordSizeInBytes, int hashfuncStartLevel,
+ IAggregatorDescriptorFactory aggeragater) throws HyracksDataException {
+ return new HybridHashGroupOperatorDescriptor(jobSpec, keyFields, frameLimits, inputSizeInRawRecords,
+ inputSizeInUniqueKeys, recordSizeInBytes, tableSize,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new IBinaryHashFunctionFamily[] { new KmerBinaryHashFunctionFamily() }, hashfuncStartLevel,
+ new KmerNormarlizedComputerFactory(), aggeragater, new MergeKmerAggregateFactory(), combineOutputRec, true);
+ }
+
+ private void generateKmerAggeragateDescriptorbyType(JobSpecification jobSpec) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ switch (groupbyType) {
+ case EXTERNAL:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+ crossGrouper = newExternalGroupby(jobSpec, keyFields, new MergeKmerAggregateFactory());
+ break;
+ case PRECLUSTER:
+ default:
+ singleGrouper = newExternalGroupby(jobSpec, keyFields, new AggregateKmerAggregateFactory());
+ connPartition = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new KmerHashPartitioncomputerFactory(), keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) });
+ crossGrouper = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ new MergeKmerAggregateFactory(), combineOutputRec);
+ break;
+ case HYBRIDHASH:
+ singleGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new AggregateKmerAggregateFactory());
+ connPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new KmerHashPartitioncomputerFactory());
+
+ crossGrouper = newHybridGroupby(jobSpec, keyFields, inputSizeInRawRecords, inputSizeInUniqueKeys,
+ recordSizeInBytes, hashfuncStartLevel, new MergeKmerAggregateFactory());
+ break;
+ }
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+
+ InputSplit[] splits = job.getInputFormat().getSplits(job, ncNodeNames.length);
+
+ LOG.info("HDFS read into " + splits.length + " splits");
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, job, splits, readSchedule,
+ new ReadsKeyValueParserFactory(kmers, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private AbstractOperatorDescriptor newGroupByReadOperator(JobSpecification jobSpec, RecordDescriptor nodeOutputRec) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ null, null});
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ jobSpec.setFrameSize(frameSize);
+
+ // File input
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("ReadKmer Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+
+ generateKmerAggeragateDescriptorbyType(jobSpec);
+ logDebug("LocalKmerGroupby Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, singleGrouper, ncNodeNames);
+
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, singleGrouper, 0);
+
+ logDebug("CrossKmerGroupby Operator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, crossGrouper, ncNodeNames);
+ jobSpec.connect(connPartition, singleGrouper, 0, crossGrouper, 0);
+
+ logDebug("Map Kmer to Read Operator");
+ //Map (Kmer, {(ReadID,PosInRead),...}) into (ReadID,PosInRead,Kmer,{OtherReadID,...})
+ RecordDescriptor mapToReadOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, null,null, null });
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec, mapToReadOutputRec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, mapKmerToRead, ncNodeNames);
+ IConnectorDescriptor mapReadConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(mapReadConn, crossGrouper, 0, mapKmerToRead, 0);
+
+ logDebug("Group by Read Operator");
+ // (ReadID,PosInRead,Kmer,{OtherReadID,...})
+ RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ null, null, null, null });
+ AbstractOperatorDescriptor groupbyReadOperator = newGroupByReadOperator(jobSpec,nodeOutputRec);
+ IConnectorDescriptor readPartition = new MToNPartitioningConnectorDescriptor(jobSpec, new ReadIDPartitionComputerFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, groupbyReadOperator, ncNodeNames);
+ jobSpec.connect(readPartition, mapKmerToRead, 0, groupbyReadOperator, 0);
+
+ // Output
+ ITupleWriterFactory writer = null;
+ switch (outputFormat) {
+ case TEXT:
+ writer = new KMerTextWriterFactory(kmers);
+ break;
+ case BINARY:
+ default:
+ writer = new KMerSequenceWriterFactory(job);
+ break;
+ }
+ HDFSWriteOperatorDescriptor writeOperator = new HDFSWriteOperatorDescriptor(jobSpec, job, writer);
+
+ logDebug("WriteOperator");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, groupbyReadOperator, 0, writeOperator, 0);
+ jobSpec.addRoot(writeOperator);
+
+ if (groupbyType == GroupbyType.PRECLUSTER) {
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ }
+ return jobSpec;
+ }
+
+
+
+ @Override
+ protected void initJobConfiguration() {
+
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ if (kmers % 2 == 0) {
+ kmers--;
+ conf.setInt(GenomixJob.KMER_LENGTH, kmers);
+ }
+ frameLimits = conf.getInt(GenomixJob.FRAME_LIMIT, GenomixJob.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJob.TABLE_SIZE, GenomixJob.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJob.FRAME_SIZE, GenomixJob.DEFAULT_FRAME_SIZE);
+ inputSizeInRawRecords = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTSIZE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTSIZE);
+ inputSizeInUniqueKeys = conf.getLong(GenomixJob.GROUPBY_HYBRID_INPUTKEYS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_INPUTKEYS);
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_SINGLE,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE);
+ hashfuncStartLevel = conf.getInt(GenomixJob.GROUPBY_HYBRID_HASHLEVEL,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_HASHLEVEL);
+ /** here read the different recordSize why ? */
+ recordSizeInBytes = conf.getInt(GenomixJob.GROUPBY_HYBRID_RECORDSIZE_CROSS,
+ GenomixJob.DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS);
+
+ bGenerateReversedKmer = conf.getBoolean(GenomixJob.REVERSED_KMER, GenomixJob.DEFAULT_REVERSED);
+
+ String type = conf.get(GenomixJob.GROUPBY_TYPE, GenomixJob.DEFAULT_GROUPBY_TYPE);
+ if (type.equalsIgnoreCase("external")) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase("precluster")) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
+
+ String output = conf.get(GenomixJob.OUTPUT_FORMAT, GenomixJob.DEFAULT_OUTPUT_FORMAT);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ job = new JobConf(conf);
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmers);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame size" + frameSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
new file mode 100644
index 0000000..540bdaf
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenStatistic.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.job;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.hyracks.util.ByteComparatorFactory;
+import edu.uci.ics.genomix.hyracks.util.StatCountAggregateFactory;
+import edu.uci.ics.genomix.hyracks.util.StatReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.util.StatSumAggregateFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenStatistic extends JobGen {
+ private int kmers;
+ private JobConf hadoopjob;
+ private RecordDescriptor readOutputRec;
+ private String[] ncNodeNames;
+ private Scheduler scheduler;
+ private RecordDescriptor combineOutputRec;
+
+ public JobGenStatistic(GenomixJob job) {
+ super(job);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void initJobConfiguration() {
+ // TODO Auto-generated method stub
+ kmers = conf.getInt(GenomixJob.KMER_LENGTH, GenomixJob.DEFAULT_KMER);
+ hadoopjob = new JobConf(conf);
+ hadoopjob.setInputFormat(SequenceFileInputFormat.class);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+ int[] degreeFields = { 0, 1 }; // indegree, outdegree
+ int[] countFields = { 2 };
+ JobSpecification jobSpec = new JobSpecification();
+ /** specify the record fields after read */
+ readOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE });
+ combineOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { ByteSerializerDeserializer.INSTANCE,
+ ByteSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ /** the reader */
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, readOperator, ncNodeNames);
+
+ /** the combiner aggregator */
+ AbstractOperatorDescriptor degreeLocal = connectLocalAggregateByField(jobSpec, degreeFields, readOperator);
+ AbstractOperatorDescriptor countLocal = connectLocalAggregateByField(jobSpec, countFields, readOperator);
+
+ /** the final aggregator */
+ AbstractOperatorDescriptor degreeMerger = connectFinalAggregateByField(jobSpec, degreeFields, degreeLocal);
+ AbstractOperatorDescriptor countMerger = connectFinalAggregateByField(jobSpec, countFields, countLocal);
+
+ /** writer */
+ AbstractFileWriteOperatorDescriptor writeDegree = connectWriter(jobSpec, degreeFields, degreeMerger);
+ AbstractFileWriteOperatorDescriptor writeCount = connectWriter(jobSpec, countFields, countMerger);
+ jobSpec.addRoot(writeDegree);
+ jobSpec.addRoot(writeCount);
+ return jobSpec;
+ }
+
+ private HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+
+ InputSplit[] splits = hadoopjob.getInputFormat().getSplits(hadoopjob, ncNodeNames.length);
+
+ String[] readSchedule = scheduler.getLocationConstraints(splits);
+ return new HDFSReadOperatorDescriptor(jobSpec, readOutputRec, hadoopjob, splits, readSchedule,
+ new StatReadsKeyValueParserFactory());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, GenomixJob.DEFAULT_FRAME_LIMIT,
+ new IBinaryComparatorFactory[] { new ByteComparatorFactory(), new ByteComparatorFactory() }, null,
+ aggeragater, new StatSumAggregateFactory(), combineOutputRec, new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+ new ByteComparatorFactory(), new ByteComparatorFactory() }),
+ GenomixJob.DEFAULT_TABLE_SIZE), true);
+ }
+
+ private AbstractOperatorDescriptor connectLocalAggregateByField(JobSpecification jobSpec, int[] fields,
+ HDFSReadOperatorDescriptor readOperator) {
+ AbstractOperatorDescriptor localAggregator = newExternalGroupby(jobSpec, fields,
+ new StatCountAggregateFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, localAggregator, ncNodeNames);
+ IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(readfileConn, readOperator, 0, localAggregator, 0);
+ return localAggregator;
+ }
+
+ private AbstractOperatorDescriptor connectFinalAggregateByField(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor localAggregator) {
+ AbstractOperatorDescriptor finalAggregator = newExternalGroupby(jobSpec, fields, new StatSumAggregateFactory());
+ // only need one reducer
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, finalAggregator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+ IConnectorDescriptor mergeConn = new MToNPartitioningMergingConnectorDescriptor(jobSpec,
+ new ITuplePartitionComputerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ return new ITuplePartitionComputer() {
+ @Override
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return 0;
+ }
+ };
+ }
+ }, fields, new IBinaryComparatorFactory[] { new ByteComparatorFactory() });
+ jobSpec.connect(mergeConn, localAggregator, 0, finalAggregator, 0);
+ return finalAggregator;
+ }
+
+ private AbstractFileWriteOperatorDescriptor connectWriter(JobSpecification jobSpec, int[] fields,
+ AbstractOperatorDescriptor finalAggregator) {
+ LineFileWriteOperatorDescriptor writeOperator = new LineFileWriteOperatorDescriptor(jobSpec, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, writeOperator, ncNodeNames[fields[0]
+ % ncNodeNames.length]);
+
+ IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(jobSpec);
+ jobSpec.connect(printConn, finalAggregator, 0, writeOperator, 0);
+ return writeOperator;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
new file mode 100644
index 0000000..b070b56
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/ByteComparatorFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+
+public class ByteComparatorFactory implements IBinaryComparatorFactory, IBinaryHashFunctionFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return b1[s1] - b2[s2];
+ }
+
+ };
+ }
+
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ return new IBinaryHashFunction() {
+
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ return bytes[offset];
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
new file mode 100644
index 0000000..f483a9c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatCountAggregateFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatCountAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public class CountAggregator implements IAggregatorDescriptor {
+ private final int[] keyFields;
+
+ public CountAggregator(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = 1;
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = 1;
+
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, keyFields.length);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, keyFields.length);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new CountAggregator(keyFields);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..c01aae1
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatReadsKeyValueParserFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.ByteSerializerDeserializer;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerCountValue;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class StatReadsKeyValueParserFactory implements IKeyValueParserFactory<KmerBytesWritable, KmerCountValue> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IKeyValueParser<KmerBytesWritable, KmerCountValue> createKeyValueParser(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+ return new IKeyValueParser<KmerBytesWritable, KmerCountValue>() {
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void parse(KmerBytesWritable key, KmerCountValue value, IFrameWriter writer)
+ throws HyracksDataException {
+ byte adjMap = value.getAdjBitMap();
+ byte count = value.getCount();
+ InsertToFrame((byte) (GeneCode.inDegree(adjMap)), (byte) (GeneCode.outDegree(adjMap)), count, writer);
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+
+ private void InsertToFrame(byte indegree, byte outdegree, byte count, IFrameWriter writer) {
+ try {
+ tupleBuilder.reset();
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, indegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, outdegree);
+ tupleBuilder.addField(ByteSerializerDeserializer.INSTANCE, count);
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record size is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
new file mode 100644
index 0000000..fb37056
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/util/StatSumAggregateFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class StatSumAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public class DistributeAggregatorDescriptor implements IAggregatorDescriptor {
+
+ private final int[] keyFields;
+
+ public DistributeAggregatorDescriptor(int[] keyFields) {
+ this.keyFields = keyFields;
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ protected int getCount(IFrameTupleAccessor accessor, int tIndex) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, 1);
+ int countoffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
+ byte[] data = accessor.getBuffer().array();
+ return IntegerSerializerDeserializer.getInt(data, countoffset);
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when initializing the aggregator.");
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+
+ int statetupleOffset = stateAccessor.getTupleStartOffset(stateTupleIndex);
+ int countfieldStart = stateAccessor.getFieldStartOffset(stateTupleIndex, 1);
+ int countoffset = statetupleOffset + stateAccessor.getFieldSlotsLength() + countfieldStart;
+
+ byte[] data = stateAccessor.getBuffer().array();
+ count += IntegerSerializerDeserializer.getInt(data, countoffset);
+ IntegerSerializerDeserializer.putInt(count, data, countoffset);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ int count = getCount(accessor, tIndex);
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ try {
+ fieldOutput.writeInt(count);
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ outputPartialResult(tupleBuilder, accessor, tIndex, state);
+
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return new DistributeAggregatorDescriptor(keyFields);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
index 847272a..4a279d6 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/example/jobrun/JobRunTest.java
@@ -39,9 +39,9 @@
import org.junit.Before;
import org.junit.Test;
-import edu.uci.ics.genomix.driver.Driver;
-import edu.uci.ics.genomix.driver.Driver.Plan;
-import edu.uci.ics.genomix.job.GenomixJob;
+import edu.uci.ics.genomix.hyracks.driver.Driver;
+import edu.uci.ics.genomix.hyracks.driver.Driver.Plan;
+import edu.uci.ics.genomix.hyracks.job.GenomixJob;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerCountValue;