Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
deleted file mode 100644
index 3826f86..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package edu.uci.ics.genomix.oldtype;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
-
- private static final long serialVersionUID = 1L;
- public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
-
- private KmerListWritable forwardForwardList;
- private KmerListWritable forwardReverseList;
- private KmerListWritable reverseForwardList;
- private KmerListWritable reverseReverseList;
- private PositionWritable nodeId;
-
- public IntermediateNodeWritable(){
- forwardForwardList = new KmerListWritable();
- forwardReverseList = new KmerListWritable();
- reverseForwardList = new KmerListWritable();
- reverseReverseList = new KmerListWritable();
- nodeId = new PositionWritable();
- }
-
- public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
- this();
- set(FFList, FRList, RFList, RRList, uniqueKey);
- }
-
- public void set(IntermediateNodeWritable node){
- set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
- node.reverseReverseList, node.nodeId);
- }
-
- public void set(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
- this.forwardForwardList.set(FFList);
- this.forwardReverseList.set(FRList);
- this.reverseForwardList.set(RFList);
- this.reverseReverseList.set(RRList);
- this.nodeId.set(uniqueKey);
- }
-
- public KmerListWritable getFFList() {
- return forwardForwardList;
- }
-
- public void setFFList(KmerListWritable forwardForwardList) {
- this.forwardForwardList.set(forwardForwardList);
- }
-
- public KmerListWritable getFRList() {
- return forwardReverseList;
- }
-
- public void setFRList(KmerListWritable forwardReverseList) {
- this.forwardReverseList.set(forwardReverseList);
- }
-
- public KmerListWritable getRFList() {
- return reverseForwardList;
- }
-
- public void setRFList(KmerListWritable reverseForwardList) {
- this.reverseForwardList.set(reverseForwardList);
- }
-
- public KmerListWritable getRRList() {
- return reverseReverseList;
- }
-
- public void setRRList(KmerListWritable reverseReverseList) {
- this.reverseReverseList.set(reverseReverseList);
- }
-
- public PositionWritable getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(PositionWritable nodeId) {
- this.nodeId.set(nodeId);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.forwardForwardList.readFields(in);
- this.forwardReverseList.readFields(in);
- this.reverseForwardList.readFields(in);
- this.reverseReverseList.readFields(in);
- this.nodeId.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.forwardForwardList.write(out);
- this.forwardReverseList.write(out);
- this.reverseForwardList.write(out);
- this.reverseReverseList.write(out);
- this.nodeId.write(out);
- }
-
- @Override
- public int compareTo(IntermediateNodeWritable other) {
- // TODO Auto-generated method stub
- return this.nodeId.compareTo(other.nodeId);
- }
-
- @Override
- public int hashCode() {
- return this.nodeId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof IntermediateNodeWritable) {
- IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
- return (this.forwardForwardList.equals(nw.forwardForwardList)
- && this.forwardReverseList.equals(nw.forwardReverseList)
- && this.reverseForwardList.equals(nw.reverseForwardList)
- && this.reverseReverseList.equals(nw.reverseReverseList) && (this.nodeId.equals(nw.nodeId)));
- }
- return false;
- }
-
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
- sbuilder.append(nodeId.toString()).append('\t');
- sbuilder.append(forwardForwardList.toString()).append('\t');
- sbuilder.append(forwardReverseList.toString()).append('\t');
- sbuilder.append(reverseForwardList.toString()).append('\t');
- sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
- return sbuilder.toString();
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index f14056c..c3e53e8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -60,7 +60,7 @@
* Initial Kmer space by kmerlength
*
* @param k
- * kmerlength
+ * kmerlengthz
*/
public KmerBytesWritable(int k) {
this.kmerlength = k;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 3650553..b101312 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -24,9 +24,10 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,23 +45,32 @@
private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
public static final int OutputKmerField = 0;
- public static final int OutputNodeIdField = 1;
+ public static final int outputNodeIdListField = 1;
public static final int OutputForwardForwardField = 2;
- public static final int OutputForwardReverseField = 3;
- public static final int OutputReverseForwardField = 4;
- public static final int OutputReverseReverseField = 5;
+ public static final int OutputFFListCountField = 3;
+ public static final int OutputForwardReverseField = 4;
+ public static final int OutputFRListCountField = 5;
+ public static final int OutputReverseForwardField = 6;
+ public static final int OutputRFListCountField = 7;
+ public static final int OutputReverseReverseField = 8;
+ public static final int OutputRRListCountField = 9;
private final int readLength;
private final int kmerSize;
public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
- null });
+ null, null, null, null, null, null, null, null});
- public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
+ public ReadsKeyValueParserFactory(int readlength, int k) {
this.readLength = readlength;
this.kmerSize = k;
}
-
+
+ public static enum KmerDir {
+ FORWARD,
+ REVERSE,
+ }
+
@Override
public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
@@ -70,12 +80,24 @@
return new IKeyValueParser<LongWritable, Text>() {
- private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
- private KmerBytesWritable nextKmer = new KmerBytesWritable(kmerSize);
private PositionWritable nodeId = new PositionWritable();
- private KmerListWritable kmerList = new KmerListWritable(kmerSize);
- private IntermediateNodeWritable interMediateNode = new IntermediateNodeWritable();
- private byte mateId = 0;
+ private PositionListWritable nodeIdList = new PositionListWritable();
+ private KmerListWritable edgeListForPreKmer = new KmerListWritable(kmerSize);
+ private KmerListWritable edgeListForNextKmer = new KmerListWritable(kmerSize);
+ private NodeWritable outputNode = new NodeWritable(kmerSize);
+
+ private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+
+ private KmerDir preKmerDir = KmerDir.FORWARD;
+ private KmerDir curKmerDir = KmerDir.FORWARD;
+ private KmerDir nextKmerDir = KmerDir.FORWARD;
+
+ byte mateId = (byte) 0;
@Override
public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
@@ -108,50 +130,167 @@
if (kmerSize >= array.length) {
return;
}
- kmer.setByRead(array, 0);
- nextKmer.set(kmer);
- nextKmer.shiftKmerWithNextChar(array[kmerSize]);
- kmerList.append(nextKmer);
- nextKmer.toString();
- kmerList.toString();
-// nodeId.set(mateId, readID, 1);
-// interMediateNode.setNodeId(nodeId);
-// interMediateNode.setFFList(kmerList);
- InsertToFrame(kmer, kmerList, writer);
+ outputNode.reset(kmerSize);
+ curForwardKmer.setByRead(array, 0);
+ curReverseKmer.setByReadReverse(array, 0);
+ curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ setNextKmer(array[kmerSize]);
+ setnodeId(mateId, readID, 1);
+ setEdgeListForNextKmer();
+ writeToFrame(writer);
/** middle kmer */
int i = kmerSize;
for (; i < array.length - 1; i++) {
- kmer.shiftKmerWithNextChar(array[i]);
- nextKmer.set(kmer);
- nextKmer.shiftKmerWithNextChar(array[i+1]);
- kmerList.append(nextKmer);
-// nodeId.set(mateId, readID, i - kmerSize + 2);
-// interMediateNode.setNodeId(nodeId);
-// interMediateNode.setFFList(kmerList);
- InsertToFrame(kmer, kmerList, writer);
+ outputNode.reset(kmerSize);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setNextKmer(array[i]);
+ setnodeId(mateId, readID, i - kmerSize + 1);
+ setEdgeListForPreKmer();
+ setEdgeListForNextKmer();
+ writeToFrame(writer);
}
-//
-// /** last kmer */
-// kmer.shiftKmerWithNextChar(array[i]);
-// nodeId.set(mateId, readID, i - kmerSize + 2);
-// interMediateNode.setNodeId(nodeId);
-// InsertToFrame(kmer, interMediateNode, writer);
+
+ /** last kmer */
+ outputNode.reset(kmerSize);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setnodeId(mateId, readID, array.length - kmerSize + 1);
+ setEdgeListForPreKmer();
+ writeToFrame(writer);
}
- //IntermediateNodeWritable node
- private void InsertToFrame(KmerBytesWritable kmer, KmerListWritable kmerList, IFrameWriter writer) {
+
+ public void setnodeId(byte mateId, long readID, int posId){
+ nodeId.set(mateId, readID, posId);
+ nodeIdList.reset();
+ nodeIdList.append(nodeId);
+ outputNode.setNodeIdList(nodeIdList);
+ }
+
+ public void setNextKmer(byte nextChar){
+ nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.shiftKmerWithNextChar(nextChar);
+ nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+ nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ public void setPreKmerByOldCurKmer(){
+ preKmerDir = curKmerDir;
+ preForwardKmer.set(curForwardKmer);
+ preReverseKmer.set(curReverseKmer);
+ }
+
+ public void setCurKmerByOldNextKmer(){
+ curKmerDir = nextKmerDir;
+ curForwardKmer.set(nextForwardKmer);
+ curReverseKmer.set(nextReverseKmer);
+ }
+
+ public void writeToFrame(IFrameWriter writer) {
+ switch(curKmerDir){
+ case FORWARD:
+ InsertToFrame(curForwardKmer, outputNode, writer);
+ break;
+ case REVERSE:
+ InsertToFrame(curReverseKmer, outputNode, writer);
+ break;
+ }
+ }
+ public void setEdgeListForPreKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setRRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setRFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setFRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(kmerSize);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setFFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ public void setEdgeListForNextKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setFFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setFRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setRFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(kmerSize);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setRRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ private void InsertToFrame(KmerBytesWritable kmer, NodeWritable node, IFrameWriter writer) {
try {
-// if (Math.abs(node.getNodeId().getPosId()) > 32768) {
-// throw new IllegalArgumentException("Position id is beyond 32768 at " + node.getNodeId().getReadId());
-// }
tupleBuilder.reset();
tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
- tupleBuilder.addField(kmerList.getByteArray(), kmer.getOffset(), kmer.getLength());
- //tupleBuilder.addField(node.getNodeId().getByteArray(), node.getNodeId().getStartOffset(), node.getNodeId().getLength());
+
+ //tupleBuilder.addField(node.getnodeId().getByteArray(), node.getnodeId().getStartOffset(), node.getnodeId().getLength());
// tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
// tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
// tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
// tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
+
+ tupleBuilder.addField(node.getNodeIdList().getByteArray(), node.getNodeIdList().getStartOffset(), node.getNodeIdList().getLength());
+
+ tupleBuilder.addField(node.getFFList().getByteArray(), node.getFFList().getStartOffset(), node.getFFList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getFFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getFRList().getByteArray(), node.getFRList().getStartOffset(), node.getFRList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getRFList().getByteArray(), node.getRFList().getStartOffset(), node.getRFList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(node.getRRList().getByteArray(), node.getRRList().getStartOffset(), node.getRRList().getLength());
+ tupleBuilder.getDataOutput().writeInt(node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..cfd582b
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final int readLength;
+ private final int kmerSize;
+
+ public AggregateKmerAggregateFactory(int readlength, int k) {
+ this.readLength = readlength;
+ this.kmerSize = k;
+ }
+
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+// private PositionReference position = new PositionReference();
+
+ private NodeWritable readNode = new NodeWritable(kmerSize);
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new NodeWritable());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset(kmerSize);
+ readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+// inputVal.append(position);
+
+ // make an empty field
+ tupleBuilder.addFieldEndOffset();///???
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ readNode.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//????
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ try {
+ fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
+
+ tupleBuilder.addFieldEndOffset();
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..2cad0ca
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.newgraph.dataflow.aggregators;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hsqldb.lib.Iterator;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
+
+ private final int readLength;
+ private final int kmerSize;
+
+ public MergeKmerAggregateFactory(int readlength, int k) {
+ this.readLength = readlength;
+ this.kmerSize = k;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
+ return new IAggregatorDescriptor() {
+
+// private PositionReference position = new PositionReference();
+
+ private NodeWritable readNode = new NodeWritable(kmerSize);
+ private HashSet set = new HashSet();
+ private PositionListWritable uniNodeIdList = new PositionListWritable();
+ private KmerListWritable uniEdgeList = new KmerListWritable(kmerSize);
+ private KmerBytesWritable tempKmer = new KmerBytesWritable(kmerSize);
+ private PositionWritable tempPos = new PositionWritable();
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new NodeWritable());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ localUniNode.reset(kmerSize);
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ readNode.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+ }
+ //make a fake feild to cheat caller
+ tupleBuilder.addFieldEndOffset();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
+ localUniNode.getFFList().appendList(readNode.getFFList());
+ localUniNode.getFRList().appendList(readNode.getFRList());
+ localUniNode.getRFList().appendList(readNode.getRFList());
+ localUniNode.getRRList().appendList(readNode.getRRList());
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ NodeWritable localUniNode = (NodeWritable) state.state;
+ uniNodeIdList.reset();
+ for(java.util.Iterator<PositionWritable> iter = localUniNode.getNodeIdList().iterator(); iter.hasNext();){
+ tempPos.set(iter.next());
+ if(set.add(tempPos))
+ uniNodeIdList.append(tempPos);
+ }
+ localUniNode.getNodeIdList().reset();
+ localUniNode.getNodeIdList().set(uniNodeIdList);
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFFList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getFFList().reset();
+ localUniNode.getFFList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getFRList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getFRList().reset();
+ localUniNode.getFRList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRFList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getRFList().reset();
+ localUniNode.getRFList().set(uniEdgeList);
+
+ uniEdgeList.reset();
+ for(java.util.Iterator<KmerBytesWritable> iter = localUniNode.getRRList().iterator(); iter.hasNext();){
+ tempKmer.set(iter.next());
+ if(set.add(tempKmer))
+ uniEdgeList.append(tempKmer);
+ }
+ localUniNode.getRRList().reset();
+ localUniNode.getRRList().set(uniEdgeList);
+
+ try {
+ if (localUniNode.getLength() > frameSize / 2) {
+ LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+ }
+ fieldOutput.write(localUniNode.getByteArray(), localUniNode.getStartOffset(), localUniNode.getLength());
+ tupleBuilder.addFieldEndOffset();
+
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
index abfff00..96e31b8 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenBrujinGraph.java
@@ -24,18 +24,53 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
-import edu.uci.ics.genomix.hyracks.job.JobGen;
-import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
@SuppressWarnings("deprecation")
@@ -85,7 +120,164 @@
}
initJobConfiguration(scheduler);
}
-
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+ aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+ tableSize), true);
+ }
+
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
+
+ Object[] obj = new Object[3];
+
+ switch (groupbyType) {
+ case EXTERNAL:
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ combineRed);
+ obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+ obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+ finalRec);
+ break;
+ case PRECLUSTER:
+ default:
+
+ obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+ combineRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ finalRec);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ break;
+ }
+ return obj;
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+ return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+ kmerSize, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
+ }
+
+ public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+ connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null });
+ jobSpec.setFrameSize(frameSize);
+
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
+ new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ logDebug("LocalKmerGroupby Operator");
+ connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("CrossKmerGroupby Operator");
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+ return kmerCrossAggregator;
+ }
+
+
+ public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ kmerWriter = new KMerTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeKmerOperator;
+ }
+
+ public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), nodeWriter);
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeNodeOperator;
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ // logDebug("Write kmer to result");
+ // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+ logDebug("Write node to result");
+ lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);
+
+ jobSpec.addRoot(lastOperator);
+ return jobSpec;
+ }
+
protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
Configuration conf = confFactory.getConf();
readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
@@ -131,30 +323,5 @@
LOG.info("Frame limit" + frameLimits);
LOG.info("Frame kmerByteSize" + frameSize);
}
-
- public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
- try {
- InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
- .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
- return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
- hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
- kmerSize, bGenerateReversedKmer));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
- IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
- jobSpec.connect(conn, preOp, 0, nextOp, 0);
- }
-
- @Override
- public JobSpecification generateJob() throws HyracksException {
- // TODO Auto-generated method stub
- return null;
- }
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java
new file mode 100644
index 0000000..35d053a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCreateKmerInfo.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.genomix.hyracks.newgraph.job;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenCreateKmerInfo extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenCreateKmerInfo(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ logDebug("Write kmer to result");
+ lastOperator = generateKmerWritorOperator(jobSpec, lastOperator);
+ jobSpec.addRoot(lastOperator);
+
+ return jobSpec;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index bd761a5..4ce59a0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -75,8 +75,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupbyKmer();
+// TestReader();
+ TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
// TestEndToEnd();