add unmerged option in hyracks graph building
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
index b690da2..77907ee 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/dataflow/MapReadToNodeOperator.java
@@ -38,10 +38,12 @@
public class MapReadToNodeOperator extends AbstractSingleActivityOperatorDescriptor {
- public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize) {
+ public MapReadToNodeOperator(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int kmerSize,
+ boolean bMergeNode) {
super(spec, 1, 1);
recordDescriptors[0] = outRecDesc;
this.kmerSize = kmerSize;
+ this.DoMergeNodeInRead = bMergeNode;
}
/**
@@ -61,6 +63,8 @@
public static final int OutputReverseReverseField = 5;
public static final int OutputKmerBytesField = 6;
+ public final boolean DoMergeNodeInRead;
+
public static final RecordDescriptor nodeOutputRec = new RecordDescriptor(new ISerializerDeserializer[7]);
/**
@@ -141,9 +145,9 @@
for (int i = InputInfoFieldStart + 2; i < accessor.getFieldCount(); i += 2) {
readNodesInfo(tIndex, readID, nextNodeEntry, nextNextNodeEntry, i);
- if (curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0 || nextNodeEntry.inDegree() > 0
- || nextNodeEntry.outDegree() > 0 || nextNextNodeEntry.inDegree() > 0
- || nextNextNodeEntry.outDegree() > 0) {
+ if (!DoMergeNodeInRead || curNodeEntry.inDegree() > 1 || curNodeEntry.outDegree() > 0
+ || nextNodeEntry.inDegree() > 0 || nextNodeEntry.outDegree() > 0
+ || nextNextNodeEntry.inDegree() > 0 || nextNextNodeEntry.outDegree() > 0) {
connect(curNodeEntry, nextNodeEntry);
outputNode(curNodeEntry);
curNodeEntry.set(nextNodeEntry);
@@ -203,7 +207,7 @@
private void setReverseOutgoingList(NodeReference node, int offset) {
setCachList(offset);
- for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+ for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
PositionWritable pos = cachePositionList.getPosition(i);
if (pos.getPosInRead() > 0) {
node.getRFList().append(pos);
@@ -215,7 +219,7 @@
private void setReverseIncomingList(NodeReference node, int offset) {
setCachList(offset);
- for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+ for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
PositionWritable pos = cachePositionList.getPosition(i);
if (pos.getPosInRead() > 0) {
if (pos.getPosInRead() > 1) {
@@ -233,7 +237,7 @@
private void setForwardOutgoingList(NodeReference node, int offset) {
setCachList(offset);
- for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+ for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
PositionWritable pos = cachePositionList.getPosition(i);
if (pos.getPosInRead() > 0) {
node.getFFList().append(pos);
@@ -245,7 +249,7 @@
private void setForwardIncomingList(NodeReference node, int offset) {
setCachList(offset);
- for(int i = 0; i < cachePositionList.getCountOfPosition(); i++){
+ for (int i = 0; i < cachePositionList.getCountOfPosition(); i++) {
PositionWritable pos = cachePositionList.getPosition(i);
if (pos.getPosInRead() > 0) {
if (pos.getPosInRead() > 1) {
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
index 35bd838..4c24919 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/driver/Driver.java
@@ -31,6 +31,7 @@
import edu.uci.ics.genomix.hyracks.job.JobGenCreateKmerInfo;
import edu.uci.ics.genomix.hyracks.job.JobGenGroupbyReadID;
import edu.uci.ics.genomix.hyracks.job.JobGenMapKmerToRead;
+import edu.uci.ics.genomix.hyracks.job.JobGenUnMerged;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
@@ -47,6 +48,7 @@
OUTPUT_MAP_KMER_TO_READ,
OUTPUT_GROUPBY_READID,
BUILD_DEBRUJIN_GRAPH,
+ BUILD_UNMERGED_GRAPH,
}
private static final String IS_PROFILING = "genomix.driver.profiling";
@@ -109,6 +111,8 @@
case CHECK_KMERREADER:
jobGen = new JobGenCheckReader(job, scheduler, ncMap, numPartitionPerMachine);
break;
+ case BUILD_UNMERGED_GRAPH:
+ jobGen = new JobGenUnMerged(job, scheduler, ncMap, numPartitionPerMachine);
}
start = System.currentTimeMillis();
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
index 02087de..09794d0 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenBrujinGraph.java
@@ -258,7 +258,7 @@
// OutgoingList, Kmer)
AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
- MapReadToNodeOperator.nodeOutputRec, kmerSize);
+ MapReadToNodeOperator.nodeOutputRec, kmerSize, true);
connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
new OneToOneConnectorDescriptor(jobSpec));
return mapEachReadToNode;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
new file mode 100644
index 0000000..21b6385
--- /dev/null
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/job/JobGenUnMerged.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.genomix.hyracks.job;
+
+import java.util.Map;
+
+import edu.uci.ics.genomix.hyracks.dataflow.MapReadToNodeOperator;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
+
+public class JobGenUnMerged extends JobGenBrujinGraph {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public JobGenUnMerged(GenomixJobConf job, Scheduler scheduler, Map<String, NodeControllerInfo> ncMap,
+ int numPartitionPerMachine) throws HyracksDataException {
+ super(job, scheduler, ncMap, numPartitionPerMachine);
+ }
+
+ @Override
+ public AbstractOperatorDescriptor generateMapperFromReadToNode(JobSpecification jobSpec,
+ AbstractOperatorDescriptor readCrossAggregator) {
+ AbstractOperatorDescriptor mapEachReadToNode = new MapReadToNodeOperator(jobSpec,
+ MapReadToNodeOperator.nodeOutputRec, kmerSize, false);
+ connectOperators(jobSpec, readCrossAggregator, ncNodeNames, mapEachReadToNode, ncNodeNames,
+ new OneToOneConnectorDescriptor(jobSpec));
+ return mapEachReadToNode;
+ }
+}
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index 0ee2253..3a8746c 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -60,6 +60,7 @@
private static final String EXPECTED_KMER_TO_READID = EXPECTED_DIR + "result_after_kmer2readId";
private static final String EXPECTED_GROUPBYREADID = EXPECTED_DIR + "result_after_readIDAggreage";
private static final String EXPECTED_OUPUT_NODE = EXPECTED_DIR + "result_after_generateNode";
+ private static final String EXPECTED_UNMERGED = EXPECTED_DIR + "result_unmerged";
private static final String DUMPED_RESULT = ACTUAL_RESULT_DIR + HDFS_OUTPUT_PATH + "/merged.txt";
private static final String CONVERT_RESULT = DUMPED_RESULT + ".txt";
@@ -79,6 +80,15 @@
TestMapKmerToRead();
TestGroupByReadID();
TestEndToEnd();
+ TestUnMergedNode();
+ }
+
+ public void TestUnMergedNode() throws Exception {
+ conf.set(GenomixJobConf.OUTPUT_FORMAT, GenomixJobConf.OUTPUT_FORMAT_BINARY);
+ cleanUpReEntry();
+ conf.set(GenomixJobConf.GROUPBY_TYPE, GenomixJobConf.GROUPBY_TYPE_PRECLUSTER);
+ driver.runJob(new GenomixJobConf(conf), Plan.BUILD_UNMERGED_GRAPH, true);
+ Assert.assertEquals(true, checkResults(EXPECTED_UNMERGED, new int[] { 1, 2, 3, 4 }));
}
public void TestReader() throws Exception {
diff --git a/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged b/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged
new file mode 100644
index 0000000..f617779
--- /dev/null
+++ b/genomix/genomix-hyracks/src/test/resources/expected/result_unmerged
@@ -0,0 +1,24 @@
+((1,1) [(1,2)] [] [] [] AATAG)
+((1,2) [(1,3)] [] [] [(1,1)] ATAGA)
+((1,3) [(6,1),(1,4)] [] [] [(1,2)] TAGAA)
+((1,4) [(6,2)] [] [] [(1,3)] AGAAG)
+((2,1) [(2,2)] [] [] [] AATAG)
+((2,2) [(2,3)] [] [] [(2,1)] ATAGC)
+((2,3) [(2,4)] [] [] [(2,2)] TAGCT)
+((2,4) [] [] [] [(2,3)] AGCTT)
+((3,1) [(3,2)] [] [] [] AATAG)
+((3,2) [(3,3)] [] [] [(3,1)] ATAGA)
+((3,3) [(6,1),(3,4)] [] [] [(3,2)] TAGAA)
+((3,4) [(6,2)] [] [] [(3,3)] AGAAG)
+((4,1) [(4,2)] [] [] [] AATAG)
+((4,2) [(4,3)] [] [] [(4,1)] ATAGC)
+((4,3) [(4,4)] [] [] [(4,2)] TAGCT)
+((4,4) [] [] [] [(4,3)] AGCTT)
+((5,1) [(5,2)] [] [] [] AATAG)
+((5,2) [(5,3)] [] [] [(5,1)] ATAGA)
+((5,3) [(6,1),(5,4)] [] [] [(5,2)] TAGAA)
+((5,4) [(6,2)] [] [] [(5,3)] AGAAG)
+((6,1) [(6,2)] [] [] [(1,3),(3,3),(5,3)] AGAAG)
+((6,2) [(6,3)] [] [] [(3,4),(1,4),(5,4),(6,1)] GAAGA)
+((6,3) [(6,4)] [] [] [(6,2)] AAGAA)
+((6,4) [] [] [] [(6,3)] AGAAG)