Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
deleted file mode 100644
index 3826f86..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package edu.uci.ics.genomix.oldtype;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import edu.uci.ics.genomix.type.KmerListWritable;
-import edu.uci.ics.genomix.type.PositionWritable;
-
-public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
-
- private static final long serialVersionUID = 1L;
- public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
-
- private KmerListWritable forwardForwardList;
- private KmerListWritable forwardReverseList;
- private KmerListWritable reverseForwardList;
- private KmerListWritable reverseReverseList;
- private PositionWritable nodeId;
-
- public IntermediateNodeWritable(){
- forwardForwardList = new KmerListWritable();
- forwardReverseList = new KmerListWritable();
- reverseForwardList = new KmerListWritable();
- reverseReverseList = new KmerListWritable();
- nodeId = new PositionWritable();
- }
-
- public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
- this();
- set(FFList, FRList, RFList, RRList, uniqueKey);
- }
-
- public void set(IntermediateNodeWritable node){
- set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
- node.reverseReverseList, node.nodeId);
- }
-
- public void set(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, PositionWritable uniqueKey) {
- this.forwardForwardList.set(FFList);
- this.forwardReverseList.set(FRList);
- this.reverseForwardList.set(RFList);
- this.reverseReverseList.set(RRList);
- this.nodeId.set(uniqueKey);
- }
-
- public KmerListWritable getFFList() {
- return forwardForwardList;
- }
-
- public void setFFList(KmerListWritable forwardForwardList) {
- this.forwardForwardList.set(forwardForwardList);
- }
-
- public KmerListWritable getFRList() {
- return forwardReverseList;
- }
-
- public void setFRList(KmerListWritable forwardReverseList) {
- this.forwardReverseList.set(forwardReverseList);
- }
-
- public KmerListWritable getRFList() {
- return reverseForwardList;
- }
-
- public void setRFList(KmerListWritable reverseForwardList) {
- this.reverseForwardList.set(reverseForwardList);
- }
-
- public KmerListWritable getRRList() {
- return reverseReverseList;
- }
-
- public void setRRList(KmerListWritable reverseReverseList) {
- this.reverseReverseList.set(reverseReverseList);
- }
-
- public PositionWritable getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(PositionWritable nodeId) {
- this.nodeId.set(nodeId);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.forwardForwardList.readFields(in);
- this.forwardReverseList.readFields(in);
- this.reverseForwardList.readFields(in);
- this.reverseReverseList.readFields(in);
- this.nodeId.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.forwardForwardList.write(out);
- this.forwardReverseList.write(out);
- this.reverseForwardList.write(out);
- this.reverseReverseList.write(out);
- this.nodeId.write(out);
- }
-
- @Override
- public int compareTo(IntermediateNodeWritable other) {
- // TODO Auto-generated method stub
- return this.nodeId.compareTo(other.nodeId);
- }
-
- @Override
- public int hashCode() {
- return this.nodeId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof IntermediateNodeWritable) {
- IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
- return (this.forwardForwardList.equals(nw.forwardForwardList)
- && this.forwardReverseList.equals(nw.forwardReverseList)
- && this.reverseForwardList.equals(nw.reverseForwardList)
- && this.reverseReverseList.equals(nw.reverseReverseList) && (this.nodeId.equals(nw.nodeId)));
- }
- return false;
- }
-
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
- sbuilder.append(nodeId.toString()).append('\t');
- sbuilder.append(forwardForwardList.toString()).append('\t');
- sbuilder.append(forwardReverseList.toString()).append('\t');
- sbuilder.append(reverseForwardList.toString()).append('\t');
- sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
- return sbuilder.toString();
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
index 96e1cc4..95bcaf8 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerBytesWritable.java
@@ -60,7 +60,7 @@
* Initial Kmer space by kmerlength
*
* @param k
- * kmerlength
+ * kmerlengthz
*/
public KmerBytesWritable(int k) {
this.kmerlength = k;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
new file mode 100644
index 0000000..d9efd75
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.genomix.data.Marshal;
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class AggregateKmerAggregateFactory implements IAggregatorDescriptorFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private PositionListWritable nodeIdList = new PositionListWritable();
+ private KmerListWritable forwardForwardList = new KmerListWritable(kmerSize);//怎么得到kmersize
+ private KmerListWritable forwardReverseList = new KmerListWritable(kmerSize);
+ private KmerListWritable reverseForwardList = new KmerListWritable(kmerSize);
+ private KmerListWritable reverseReverseList = new KmerListWritable(kmerSize);
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ return new IAggregatorDescriptor() {
+ private NodeWritable nodeAggreter = new NodeWritable();// 能否写在外面
+
+ protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
+ int tupleOffset = accessor.getTupleStartOffset(tIndex);
+ int fieldStart = accessor.getFieldStartOffset(tIndex, fieldId);
+ int offset = tupleOffset + fieldStart + accessor.getFieldSlotsLength();
+ return offset;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new NodeWritable());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ NodeWritable inputVal = (NodeWritable) state.state;
+ inputVal.reset(kmerSize);
+ nodeIdList.reset();
+ forwardForwardList.reset(kmerSize);
+ forwardReverseList.reset(kmerSize);
+ reverseForwardList.reset(kmerSize);
+ reverseReverseList.reset(kmerSize);
+
+ kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+ nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+ int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+ forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+ int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+ forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+ int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+ reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+ int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+ reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+ nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+
+ inputVal.getKmer().set(kmer);
+ inputVal.getNodeIdList().appendList(nodeIdList);
+ inputVal.getFFList().appendList(forwardForwardList);
+ inputVal.getFRList().appendList(forwardReverseList);
+ inputVal.getRFList().appendList(reverseForwardList);
+ inputVal.getRRList().appendList(reverseReverseList);
+
+ // make an empty field
+ tupleBuilder.addFieldEndOffset();///????为啥
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ NodeWritable inputVal = (NodeWritable) state.state;
+ kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+ nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+ int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+ forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+ int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+ forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+ int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+ reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+ int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+ reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+ nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+
+ inputVal.getKmer().set(kmer);
+ inputVal.getNodeIdList().appendList(nodeIdList);
+ inputVal.getFFList().appendList(forwardForwardList);
+ inputVal.getFRList().appendList(forwardReverseList);
+ inputVal.getRFList().appendList(reverseForwardList);
+ inputVal.getRRList().appendList(reverseReverseList);
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ NodeWritable inputVal = (NodeWritable) state.state;
+ try {
+// fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+
+ tupleBuilder.addFieldEndOffset();// --------------为什么?
+ //-------------------------------------------------------
+// tupleBuilder.reset();
+ fieldOutput.write(inputVal.getKmer().getBytes(), inputVal.getKmer().getOffset(), inputVal.getKmer().getLength());
+
+ tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
+ .getLength());
+
+/* if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record kmerByteSize is too large.");
+ }
+ }*/
+
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
new file mode 100644
index 0000000..76ca95a
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/Driver.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.hyracks.job.JobGen;
+import edu.uci.ics.genomix.hyracks.job.JobGenBrujinGraph;
+import edu.uci.ics.genomix.hyracks.job.JobGenCheckReader;
+import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
+import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
+import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
+import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class Driver {
+ public static enum Plan {
+ CHECK_KMERREADER,
+ OUTPUT_KMERHASHTABLE,
+ OUTPUT_MAP_KMER_TO_READ,
+ OUTPUT_GROUPBY_READID,
+ BUILD_DEBRUJIN_GRAPH,
+ BUILD_UNMERGED_GRAPH,
+ }
+
+ private static final String IS_PROFILING = "genomix.driver.profiling";
+ private static final String CPARTITION_PER_MACHINE = "genomix.driver.duplicate.num";
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private JobGen jobGen;
+ private boolean profiling;
+
+ private int numPartitionPerMachine;
+
+ private IHyracksClientConnection hcc;
+ private Scheduler scheduler;
+
+ public Driver(String ipAddress, int port, int numPartitionPerMachine) throws HyracksException {
+ try {
+ hcc = new HyracksConnection(ipAddress, port);
+ scheduler = new Scheduler(hcc.getNodeControllerInfos());
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ this.numPartitionPerMachine = numPartitionPerMachine;
+ }
+
+ public void runJob(GenomixJobConf job) throws HyracksException {
+ runJob(job, Plan.BUILD_DEBRUJIN_GRAPH, false);
+ }
+
+ public void runJob(GenomixJobConf job, Plan planChoice, boolean profiling) throws HyracksException {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.addResource(hadoopHdfs);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+ try {
+ Map<String, NodeControllerInfo> ncMap = hcc.getNodeControllerInfos();
+ LOG.info("ncmap:" + ncMap.size() + " " + ncMap.keySet().toString());
+ switch (planChoice) {
+ case BUILD_DEBRUJIN_GRAPH:
+ default:
+ jobGen = new JobGenBrujinGraph(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case OUTPUT_KMERHASHTABLE:
+ jobGen = new JobGenCreateKmerInfo(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case OUTPUT_MAP_KMER_TO_READ:
+ jobGen = new JobGenMapKmerToRead(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case OUTPUT_GROUPBY_READID:
+ jobGen = new JobGenGroupbyReadID(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case CHECK_KMERREADER:
+ jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
+ break;
+ case BUILD_UNMERGED_GRAPH:
+ jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ start = System.currentTimeMillis();
+ run(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private void run(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification createJob = jobGen.generateJob();
+ execute(createJob);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private void execute(JobSpecification job) throws Exception {
+ job.setUseConnectorPolicyForScheduling(false);
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+
+ public static void main(String[] args) throws Exception {
+ GenomixJobConf jobConf = new GenomixJobConf();
+ String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
+ if (otherArgs.length < 4) {
+ System.err.println("Need <serverIP> <port> <input> <output>");
+ System.exit(-1);
+ }
+ String ipAddress = otherArgs[0];
+ int port = Integer.parseInt(otherArgs[1]);
+ int numOfDuplicate = jobConf.getInt(CPARTITION_PER_MACHINE, 2);
+ boolean bProfiling = jobConf.getBoolean(IS_PROFILING, true);
+ // FileInputFormat.setInputPaths(job, otherArgs[2]);
+ {
+ @SuppressWarnings("deprecation")
+ Path path = new Path(jobConf.getWorkingDirectory(), otherArgs[2]);
+ jobConf.set("mapred.input.dir", path.toString());
+
+ @SuppressWarnings("deprecation")
+ Path outputDir = new Path(jobConf.getWorkingDirectory(), otherArgs[3]);
+ jobConf.set("mapred.output.dir", outputDir.toString());
+ }
+ // FileInputFormat.addInputPath(jobConf, new Path(otherArgs[2]));
+ // FileOutputFormat.setOutputPath(job, new Path(otherArgs[3]));
+ Driver driver = new Driver(ipAddress, port, numOfDuplicate);
+ driver.runJob(jobConf, Plan.BUILD_DEBRUJIN_GRAPH, bProfiling);
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
new file mode 100644
index 0000000..ecd95d5
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/GenomixJobConf.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+@SuppressWarnings("deprecation")
+public class GenomixJobConf extends JobConf {
+
+ public static final String JOB_NAME = "genomix";
+
+ /** Kmers length */
+ public static final String KMER_LENGTH = "genomix.kmerlen";
+ /** Read length */
+ public static final String READ_LENGTH = "genomix.readlen";
+ /** Frame Size */
+ public static final String FRAME_SIZE = "genomix.framesize";
+ /** Frame Limit, hyracks need */
+ public static final String FRAME_LIMIT = "genomix.framelimit";
+ /** Table Size, hyracks need */
+ public static final String TABLE_SIZE = "genomix.tablesize";
+ /** Groupby types */
+ public static final String GROUPBY_TYPE = "genomix.graph.groupby.type";
+ /** Graph outputformat */
+ public static final String OUTPUT_FORMAT = "genomix.graph.output";
+ /** Get reversed Kmer Sequence */
+ public static final String REVERSED_KMER = "genomix.kmer.reversed";
+
+ /** Configurations used by hybrid groupby function in graph build phrase */
+ public static final String GROUPBY_HYBRID_INPUTSIZE = "genomix.graph.groupby.hybrid.inputsize";
+ public static final String GROUPBY_HYBRID_INPUTKEYS = "genomix.graph.groupby.hybrid.inputkeys";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_SINGLE = "genomix.graph.groupby.hybrid.recordsize.single";
+ public static final String GROUPBY_HYBRID_RECORDSIZE_CROSS = "genomix.graph.groupby.hybrid.recordsize.cross";
+ public static final String GROUPBY_HYBRID_HASHLEVEL = "genomix.graph.groupby.hybrid.hashlevel";
+
+ public static final int DEFAULT_KMERLEN = 21;
+ public static final int DEFAULT_READLEN = 124;
+ public static final int DEFAULT_FRAME_SIZE = 128 * 1024;
+ public static final int DEFAULT_FRAME_LIMIT = 4096;
+ public static final int DEFAULT_TABLE_SIZE = 10485767;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTSIZE = 154000000L;
+ public static final long DEFAULT_GROUPBY_HYBRID_INPUTKEYS = 38500000L;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_SINGLE = 9;
+ public static final int DEFAULT_GROUPBY_HYBRID_HASHLEVEL = 1;
+ public static final int DEFAULT_GROUPBY_HYBRID_RECORDSIZE_CROSS = 13;
+
+ public static final boolean DEFAULT_REVERSED = true;
+
+ public static final String JOB_PLAN_GRAPHBUILD = "graphbuild";
+ public static final String JOB_PLAN_GRAPHSTAT = "graphstat";
+
+ public static final String GROUPBY_TYPE_HYBRID = "hybrid";
+ public static final String GROUPBY_TYPE_EXTERNAL = "external";
+ public static final String GROUPBY_TYPE_PRECLUSTER = "precluster";
+ public static final String OUTPUT_FORMAT_BINARY = "binary";
+ public static final String OUTPUT_FORMAT_TEXT = "text";
+
+ public GenomixJobConf() throws IOException {
+ super(new Configuration());
+ }
+
+ public GenomixJobConf(Configuration conf) throws IOException {
+ super(conf);
+ }
+
+ /**
+ * Set the kmer length
+ *
+ * @param the
+ * desired frame kmerByteSize
+ */
+ final public void setKmerLength(int kmerlength) {
+ setInt(KMER_LENGTH, kmerlength);
+ }
+
+ final public void setFrameSize(int frameSize) {
+ setInt(FRAME_SIZE, frameSize);
+ }
+
+ final public void setFrameLimit(int frameLimit) {
+ setInt(FRAME_LIMIT, frameLimit);
+ }
+
+ final public void setTableSize(int tableSize) {
+ setInt(TABLE_SIZE, tableSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
new file mode 100644
index 0000000..0ffdef2
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGen.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+public abstract class JobGen implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected final ConfFactory confFactory;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+ public JobGen(GenomixJobConf job) throws HyracksDataException {
+ this.confFactory = new ConfFactory(job);
+ }
+
+ public abstract JobSpecification generateJob() throws HyracksException;
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
new file mode 100644
index 0000000..e280a7c
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/JobGenBrujinGraph.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerHashPartitioncomputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.KmerNormarlizedComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.accessors.ReadIDPartitionComputerFactory;
+import edu.uci.ics.genomix.hyracks.data.primitive.KmerPointable;
+import edu.uci.ics.genomix.hyracks.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.genomix.hyracks.dataflow.ReadsKeyValueParserFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.AggregateReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeKmerAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.aggregators.MergeReadIDAggregateFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.KMerTextWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeSequenceWriterFactory;
+import edu.uci.ics.genomix.hyracks.dataflow.io.NodeTextWriterFactory;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+@SuppressWarnings("deprecation")
+public class JobGenBrujinGraph extends JobGen {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public enum GroupbyType {
+ EXTERNAL,
+ PRECLUSTER,
+ HYBRIDHASH,
+ }
+
+ public enum OutputFormat {
+ TEXT,
+ BINARY,
+ }
+
+ protected ConfFactory hadoopJobConfFactory;
+ protected static final Log LOG = LogFactory.getLog(JobGenBrujinGraph.class);
+ protected String[] ncNodeNames;
+ protected String[] readSchedule;
+
+ protected int readLength;
+ protected int kmerSize;
+ protected int frameLimits;
+ protected int frameSize;
+ protected int tableSize;
+ protected GroupbyType groupbyType;
+ protected OutputFormat outputFormat;
+ protected boolean bGenerateReversedKmer;
+
+ protected void logDebug(String status) {
+ LOG.debug(status + " nc nodes:" + ncNodeNames.length);
+ }
+
+ public JobGenBrujinGraph(GenomixJobConf job, Scheduler scheduler, final Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job);
+ String[] nodes = new String[ncMap.size()];
+ ncMap.keySet().toArray(nodes);
+ ncNodeNames = new String[nodes.length * numPartitionPerMachine];
+ for (int i = 0; i < numPartitionPerMachine; i++) {
+ System.arraycopy(nodes, 0, ncNodeNames, i * nodes.length, nodes.length);
+ }
+ initJobConfiguration(scheduler);
+ }
+
+ private ExternalGroupOperatorDescriptor newExternalGroupby(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggeragater, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor outRed) {
+ return new ExternalGroupOperatorDescriptor(jobSpec, keyFields, frameLimits,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, normalizer,
+ aggeragater, merger, outRed, new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(pointable) }),
+ tableSize), true);
+ }
+
+ private Object[] generateAggeragateDescriptorbyType(JobSpecification jobSpec, int[] keyFields,
+ IAggregatorDescriptorFactory aggregator, IAggregatorDescriptorFactory merger,
+ ITuplePartitionComputerFactory partition, INormalizedKeyComputerFactory normalizer,
+ IPointableFactory pointable, RecordDescriptor combineRed, RecordDescriptor finalRec)
+ throws HyracksDataException {
+
+ Object[] obj = new Object[3];
+
+ switch (groupbyType) {
+ case EXTERNAL:
+ obj[0] = newExternalGroupby(jobSpec, keyFields, aggregator, merger, partition, normalizer, pointable,
+ combineRed);
+ obj[1] = new MToNPartitioningConnectorDescriptor(jobSpec, partition);
+ obj[2] = newExternalGroupby(jobSpec, keyFields, merger, merger, partition, normalizer, pointable,
+ finalRec);
+ break;
+ case PRECLUSTER:
+ default:
+
+ obj[0] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, aggregator,
+ combineRed);
+ obj[1] = new MToNPartitioningMergingConnectorDescriptor(jobSpec, partition, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) });
+ obj[2] = new PreclusteredGroupOperatorDescriptor(jobSpec, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(pointable) }, merger,
+ finalRec);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ break;
+ }
+ return obj;
+ }
+
+ public HDFSReadOperatorDescriptor createHDFSReader(JobSpecification jobSpec) throws HyracksDataException {
+ try {
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+
+ return new HDFSReadOperatorDescriptor(jobSpec, ReadsKeyValueParserFactory.readKmerOutputRec,
+ hadoopJobConfFactory.getConf(), splits, readSchedule, new ReadsKeyValueParserFactory(readLength,
+ kmerSize, bGenerateReversedKmer));
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static void connectOperators(JobSpecification jobSpec, IOperatorDescriptor preOp, String[] preNodes,
+ IOperatorDescriptor nextOp, String[] nextNodes, IConnectorDescriptor conn) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, preOp, preNodes);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, nextOp, nextNodes);
+ jobSpec.connect(conn, preOp, 0, nextOp, 0);
+ }
+
+ public AbstractOperatorDescriptor generateGroupbyKmerJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readOperator) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(KmerPointable.FACTORY) },
+ ReadsKeyValueParserFactory.readKmerOutputRec);
+ connectOperators(jobSpec, readOperator, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor combineKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null, null, null, null, null, null, null});
+ jobSpec.setFrameSize(frameSize);
+
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateKmerAggregateFactory(),
+ new MergeKmerAggregateFactory(), new KmerHashPartitioncomputerFactory(),
+ new KmerNormarlizedComputerFactory(), KmerPointable.FACTORY, combineKmerOutputRec, combineKmerOutputRec);
+ AbstractOperatorDescriptor kmerLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ logDebug("LocalKmerGroupby Operator");
+ connectOperators(jobSpec, sorter, ncNodeNames, kmerLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("CrossKmerGroupby Operator");
+ IConnectorDescriptor kmerConnPartition = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor kmerCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, kmerLocalAggregator, ncNodeNames, kmerCrossAggregator, ncNodeNames, kmerConnPartition);
+ return kmerCrossAggregator;
+ }
+
+/* public AbstractOperatorDescriptor generateMapperFromKmerToRead(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) {
+ // Map (Kmer, {(ReadID,PosInRead),...}) into
+ // (ReadID,PosInRead,{OtherPosition,...},Kmer)
+
+ AbstractOperatorDescriptor mapKmerToRead = new MapKmerPositionToReadOperator(jobSpec,
+ MapKmerPositionToReadOperator.readIDOutputRec, readLength, kmerSize);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, mapKmerToRead, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapKmerToRead;
+ }*/
+
+/* public AbstractOperatorDescriptor generateGroupbyReadJob(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapKmerToRead) throws HyracksDataException {
+ int[] keyFields = new int[] { 0 }; // the id of grouped key
+ // (ReadID, {(PosInRead,{OtherPositoin..},Kmer) ...}
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(jobSpec, frameLimits, keyFields,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ MapKmerPositionToReadOperator.readIDOutputRec);
+ connectOperators(jobSpec, mapKmerToRead, ncNodeNames, sorter, ncNodeNames, new OneToOneConnectorDescriptor(
+ jobSpec));
+
+ RecordDescriptor readIDFinalRec = new RecordDescriptor(
+ new ISerializerDeserializer[1 + 2 * MergeReadIDAggregateFactory.getPositionCount(readLength, kmerSize)]);
+ Object[] objs = generateAggeragateDescriptorbyType(jobSpec, keyFields, new AggregateReadIDAggregateFactory(),
+ new MergeReadIDAggregateFactory(readLength, kmerSize), new ReadIDPartitionComputerFactory(), null,
+ IntegerPointable.FACTORY, AggregateReadIDAggregateFactory.readIDAggregateRec, readIDFinalRec);
+ AbstractOperatorDescriptor readLocalAggregator = (AbstractOperatorDescriptor) objs[0];
+ connectOperators(jobSpec, sorter, ncNodeNames, readLocalAggregator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+
+ logDebug("Group by ReadID merger");
+ IConnectorDescriptor readconn = (IConnectorDescriptor) objs[1];
+ AbstractOperatorDescriptor readCrossAggregator = (AbstractOperatorDescriptor) objs[2];
+ connectOperators(jobSpec, readLocalAggregator, ncNodeNames, readCrossAggregator, ncNodeNames, readconn);
+ return readCrossAggregator;
+ }*/
+
+/* public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) {
+ // Map (ReadID, [(Poslist,Kmer) ... ]) to (Node, IncomingList,
+ // OutgoingList, Kmer)
+
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+ MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapEachReadToNode;
+ }
+
+ public AbstractOperatorDescriptor generateKmerWritorOperator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor kmerCrossAggregator) throws HyracksException {
+ // Output Kmer
+ ITupleWriterFactory kmerWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ kmerWriter = new KMerTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ kmerWriter = new KMerSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ HDFSWriteOperatorDescriptor writeKmerOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), kmerWriter);
+ connectOperators(jobSpec, kmerCrossAggregator, ncNodeNames, writeKmerOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeKmerOperator;
+ }
+
+ public AbstractOperatorDescriptor generateNodeWriterOpertator(JobSpecification jobSpec,
+ AbstractOperatorDescriptor mapEachReadToNode) throws HyracksException {
+ ITupleWriterFactory nodeWriter = null;
+ switch (outputFormat) {
+ case TEXT:
+ nodeWriter = new NodeTextWriterFactory(kmerSize);
+ break;
+ case BINARY:
+ default:
+ nodeWriter = new NodeSequenceWriterFactory(hadoopJobConfFactory.getConf());
+ break;
+ }
+ logDebug("WriteOperator");
+ // Output Node
+ HDFSWriteOperatorDescriptor writeNodeOperator = new HDFSWriteOperatorDescriptor(jobSpec,
+ hadoopJobConfFactory.getConf(), nodeWriter);
+ connectOperators(jobSpec, mapEachReadToNode, ncNodeNames, writeNodeOperator, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return writeNodeOperator;
+ }
+*/
+ @Override
+ public JobSpecification generateJob() throws HyracksException {
+
+ JobSpecification jobSpec = new JobSpecification();
+ logDebug("ReadKmer Operator");
+
+ HDFSReadOperatorDescriptor readOperator = createHDFSReader(jobSpec);
+
+ logDebug("Group by Kmer");
+ AbstractOperatorDescriptor lastOperator = generateGroupbyKmerJob(jobSpec, readOperator);
+
+ // logDebug("Write kmer to result");
+ // generateRootByWriteKmerGroupbyResult(jobSpec, lastOperator);
+
+// logDebug("Map Kmer to Read Operator");
+// lastOperator = generateMapperFromKmerToRead(jobSpec, lastOperator);
+
+// logDebug("Group by Read Operator");
+// lastOperator = generateGroupbyReadJob(jobSpec, lastOperator);
+
+/* logDebug("Generate final node");
+ lastOperator = generateMapperFromReadToNode(jobSpec, lastOperator);
+ logDebug("Write node to result");
+ lastOperator = generateNodeWriterOpertator(jobSpec, lastOperator);*/
+
+ jobSpec.addRoot(lastOperator);
+ return jobSpec;
+ }
+
+ protected void initJobConfiguration(Scheduler scheduler) throws HyracksDataException {
+ Configuration conf = confFactory.getConf();
+ readLength = conf.getInt(GenomixJobConf.READ_LENGTH, GenomixJobConf.DEFAULT_READLEN);
+ kmerSize = conf.getInt(GenomixJobConf.KMER_LENGTH, GenomixJobConf.DEFAULT_KMERLEN);
+ if (kmerSize % 2 == 0) {
+ kmerSize--;
+ conf.setInt(GenomixJobConf.KMER_LENGTH, kmerSize);
+ }
+ frameLimits = conf.getInt(GenomixJobConf.FRAME_LIMIT, GenomixJobConf.DEFAULT_FRAME_LIMIT);
+ tableSize = conf.getInt(GenomixJobConf.TABLE_SIZE, GenomixJobConf.DEFAULT_TABLE_SIZE);
+ frameSize = conf.getInt(GenomixJobConf.FRAME_SIZE, GenomixJobConf.DEFAULT_FRAME_SIZE);
+
+ bGenerateReversedKmer = conf.getBoolean(GenomixJobConf.REVERSED_KMER, GenomixJobConf.DEFAULT_REVERSED);
+
+ String type = conf.get(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_EXTERNAL)) {
+ groupbyType = GroupbyType.EXTERNAL;
+ } else if (type.equalsIgnoreCase(GenomixJobConf.GROUPBY_TYPE_PRECLUSTER)) {
+ groupbyType = GroupbyType.PRECLUSTER;
+ } else {
+ groupbyType = GroupbyType.HYBRIDHASH;
+ }
+
+ String output = conf.get(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ if (output.equalsIgnoreCase("text")) {
+ outputFormat = OutputFormat.TEXT;
+ } else {
+ outputFormat = OutputFormat.BINARY;
+ }
+ try {
+ hadoopJobConfFactory = new ConfFactory(new JobConf(conf));
+ InputSplit[] splits = hadoopJobConfFactory.getConf().getInputFormat()
+ .getSplits(hadoopJobConfFactory.getConf(), ncNodeNames.length);
+ readSchedule = scheduler.getLocationConstraints(splits);
+ } catch (IOException ex) {
+ throw new HyracksDataException(ex);
+ }
+
+ LOG.info("Genomix Graph Build Configuration");
+ LOG.info("Kmer:" + kmerSize);
+ LOG.info("Groupby type:" + type);
+ LOG.info("Output format:" + output);
+ LOG.info("Frame limit" + frameLimits);
+ LOG.info("Frame kmerByteSize" + frameSize);
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
new file mode 100644
index 0000000..f0f72b9
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/MergeKmerAggregateFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class MergeKmerAggregateFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(MergeKmerAggregateFactory.class);
+
+ @Override
+ public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
+ throws HyracksDataException {
+ final int frameSize = ctx.getFrameSize();
+ return new IAggregatorDescriptor() {
+
+ private PositionReference position = new PositionReference();
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState(new ArrayBackedValueStorage());
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ inputVal.reset();
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
+ }
+ //make a fake feild to cheat caller
+ tupleBuilder.addFieldEndOffset();
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ int leadOffset = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength();
+ for (int offset = accessor.getFieldStartOffset(tIndex, 1); offset < accessor.getFieldEndOffset(tIndex,
+ 1); offset += PositionReference.LENGTH) {
+ position.setNewReference(accessor.getBuffer().array(), leadOffset + offset);
+ inputVal.append(position);
+ }
+ }
+
+ @Override
+ public void outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ throw new IllegalStateException("partial result method should not be called");
+ }
+
+ @Override
+ public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ DataOutput fieldOutput = tupleBuilder.getDataOutput();
+ ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ try {
+ if (inputVal.getLength() > frameSize / 2) {
+ LOG.warn("MergeKmer: output data kmerByteSize is too big: " + inputVal.getLength());
+ }
+ fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();
+
+ } catch (IOException e) {
+ throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+
+ }
+}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
new file mode 100644
index 0000000..b1b22ac
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.genomix.hyracks.contrail.graph;
+
+import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
+import edu.uci.ics.genomix.type.GeneCode;
+import edu.uci.ics.genomix.type.IntermediateNodeWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerBytesWritableFactory;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+import edu.uci.ics.genomix.type.ReadIDWritable;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+
+public class ReadsKeyValueParserFactory implements IKeyValueParserFactory<LongWritable, Text> {
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(ReadsKeyValueParserFactory.class);
+
+ public static final int OutputKmerField = 0;
+ public static final int OutputPosition = 1;
+
+ private final boolean bReversed;
+ private final int readLength;
+ private final int kmerSize;
+
+ public static final RecordDescriptor readKmerOutputRec = new RecordDescriptor(new ISerializerDeserializer[] { null,
+ null, null, null, null, null, null});
+
+ public ReadsKeyValueParserFactory(int readlength, int k, boolean bGenerateReversed) {
+ bReversed = bGenerateReversed;
+ this.readLength = readlength;
+ this.kmerSize = k;
+ }
+
+ public static enum KmerDir {
+ FORWARD,
+ REVERSE,
+ }
+
+ @Override
+ public IKeyValueParser<LongWritable, Text> createKeyValueParser(final IHyracksTaskContext ctx) {
+ final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(2);
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+ final FrameTupleAppender outputAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outputAppender.reset(outputBuffer, true);
+
+ return new IKeyValueParser<LongWritable, Text>() {
+
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);//
+ private PositionReference pos = new PositionReference();//
+
+ private KmerBytesWritable preForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable preReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable curReverseKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextForwardKmer = new KmerBytesWritable(kmerSize);
+ private KmerBytesWritable nextReverseKmer = new KmerBytesWritable(kmerSize);
+ private IntermediateNodeWritable outputNode = new IntermediateNodeWritable();
+ private ReadIDWritable readId = new ReadIDWritable();
+ private KmerListWritable kmerList = new KmerListWritable();
+
+ private KmerBytesWritableFactory kmerFactory = new KmerBytesWritableFactory(kmerSize);
+ private KmerDir preKmerDir = KmerDir.FORWARD;
+ private KmerDir curKmerDir = KmerDir.FORWARD;
+ private KmerDir nextKmerDir = KmerDir.FORWARD;
+
+ byte mateId = (byte) 0;
+
+ @Override
+ public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
+ String[] geneLine = value.toString().split("\\t"); // Read the Real Gene Line
+ if (geneLine.length != 2) {
+ return;
+ }
+ int readID = 0;
+ try {
+ readID = Integer.parseInt(geneLine[0]);
+ } catch (NumberFormatException e) {
+ LOG.warn("Invalid data ");
+ return;
+ }
+
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine[1]);
+ boolean isValid = geneMatcher.matches();
+ if (isValid) {
+ if (geneLine[1].length() != readLength) {
+ LOG.warn("Invalid readlength at: " + readID);
+ return;
+ }
+ SplitReads(readID, geneLine[1].getBytes(), writer);
+ }
+ }
+
+ private void SplitReads(int readID, byte[] array, IFrameWriter writer) {
+/*
+ if (kmerSize >= array.length) {
+ return;
+ }
+ kmer.setByRead(array, 0);
+ InsertToFrame(kmer, readID, 1, writer);
+
+
+ for (int i = kmerSize; i < array.length; i++) {
+ kmer.shiftKmerWithNextChar(array[i]);
+ InsertToFrame(kmer, readID, i - kmerSize + 2, writer);
+ }
+
+ if (bReversed) {
+ kmer.setByReadReverse(array, 0);
+ InsertToFrame(kmer, readID, -1, writer);
+ for (int i = kmerSize; i < array.length; i++) {
+ kmer.shiftKmerWithPreCode(GeneCode.getPairedCodeFromSymbol(array[i]));
+ InsertToFrame(kmer, readID, -(i - kmerSize + 2), writer);
+ }
+ }*/
+ ///////////////////////////////////////
+ if (kmerSize >= array.length) {
+ return;
+ }
+ /** first kmer **/
+ curForwardKmer.setByRead(array, 0);
+ curReverseKmer.set(kmerFactory.reverse(curForwardKmer));
+ curKmerDir = curForwardKmer.compareTo(curReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ setNextKmer(array[kmerSize]);
+ readId.set(mateId, readID);
+ outputNode.setreadId(readId);
+ setEdgeListForNextKmer();
+ switch (curKmerDir) {
+ case FORWARD:
+ InsertToFrame(curForwardKmer, outputNode, writer);
+ break;
+ case REVERSE:
+ InsertToFrame(curReverseKmer, outputNode, writer);
+ break;
+ }
+ /** middle kmer **/
+ for (int i = kmerSize + 1; i < array.length; i++) {
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setNextKmer(array[i]);
+ //set value.readId
+ readId.set(mateId, readID);
+ outputNode.setreadId(readId);
+ //set value.edgeList
+ setEdgeListForPreKmer();
+ setEdgeListForNextKmer();
+ //output mapper result
+ switch (curKmerDir) {
+ case FORWARD:
+ InsertToFrame(curForwardKmer, outputNode, writer);
+ break;
+ case REVERSE:
+ InsertToFrame(curReverseKmer, outputNode, writer);
+ break;
+ }
+ }
+ /** last kmer **/
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ //set value.readId
+ readId.set(mateId, readID);
+ outputNode.setreadId(readId);
+ //set value.edgeList
+ setEdgeListForPreKmer();
+ //output mapper result
+ switch (curKmerDir) {
+ case FORWARD:
+ InsertToFrame(curForwardKmer, outputNode, writer);
+ break;
+ case REVERSE:
+ InsertToFrame(curReverseKmer, outputNode, writer);
+ break;
+ }
+ }
+
+ public void setPreKmer(byte preChar){
+ preForwardKmer.set(curForwardKmer);
+ preForwardKmer.shiftKmerWithPreChar(preChar);
+ preReverseKmer.set(preForwardKmer);
+ preReverseKmer.set(kmerFactory.reverse(nextForwardKmer));
+ preKmerDir = preForwardKmer.compareTo(preReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ public void setNextKmer(byte nextChar) {
+ nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.shiftKmerWithNextChar(nextChar);
+ nextReverseKmer.set(nextForwardKmer);
+ nextReverseKmer.set(kmerFactory.reverse(nextForwardKmer));
+ nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) >= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ public void setPreKmerByOldCurKmer() {
+ preKmerDir = curKmerDir;
+ preForwardKmer.set(curForwardKmer);
+ preReverseKmer.set(curReverseKmer);
+ }
+
+ //old nextKmer becomes current curKmer
+ public void setCurKmerByOldNextKmer() {
+ curKmerDir = nextKmerDir;
+ curForwardKmer.set(nextForwardKmer);
+ preReverseKmer.set(nextReverseKmer);
+ }
+
+ public void setEdgeListForNextKmer() {
+ switch (curKmerDir) {
+ case FORWARD:
+ switch (nextKmerDir) {
+ case FORWARD:
+ kmerList.reset();
+ kmerList.append(nextForwardKmer);
+ outputNode.setFFList(kmerList);
+ break;
+ case REVERSE:
+ kmerList.reset();
+ kmerList.append(nextReverseKmer);
+ outputNode.setFRList(kmerList);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch (nextKmerDir) {
+ case FORWARD:
+ kmerList.reset();
+ kmerList.append(nextForwardKmer);
+ outputNode.setRFList(kmerList);
+ break;
+ case REVERSE:
+ kmerList.reset();
+ kmerList.append(nextReverseKmer);
+ outputNode.setRRList(kmerList);
+ break;
+ }
+ break;
+ }
+ }
+
+ public void setEdgeListForPreKmer() {
+ switch (curKmerDir) {
+ case FORWARD:
+ switch (preKmerDir) {
+ case FORWARD:
+ kmerList.reset();
+ kmerList.append(preForwardKmer);
+ outputNode.setRRList(kmerList);
+ break;
+ case REVERSE:
+ kmerList.reset();
+ kmerList.append(preReverseKmer);
+ outputNode.setRFList(kmerList);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch (preKmerDir) {
+ case FORWARD:
+ kmerList.reset();
+ kmerList.append(nextForwardKmer);
+ outputNode.setFRList(kmerList);
+ break;
+ case REVERSE:
+ kmerList.reset();
+ kmerList.append(nextReverseKmer);
+ outputNode.setFFList(kmerList);
+ break;
+ }
+ break;
+ }
+ }
+
+ private void InsertToFrame(KmerBytesWritable kmer, IntermediateNodeWritable Node, IFrameWriter writer) {
+ try {
+ tupleBuilder.reset();
+ tupleBuilder.addField(kmer.getBytes(), kmer.getOffset(), kmer.getLength());
+
+ tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
+ .getLength());
+
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record kmerByteSize is too large.");
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void open(IFrameWriter writer) throws HyracksDataException {
+ }
+
+ @Override
+ public void close(IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ }
+ };
+ }
+
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index bd761a5..4ce59a0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -75,8 +75,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupbyKmer();
+// TestReader();
+ TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
// TestEndToEnd();