NodeWritable: Remove nodeIdList, refactor edges, add threads, add start/end reads
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
index 6baf86c..0232054 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -33,33 +33,22 @@
private float averageCoverage;
// merge/update directions
+
+ // merge/update directions
public static class DirectionFlag {
public static final byte DIR_FF = 0b00 << 0;
public static final byte DIR_FR = 0b01 << 0;
public static final byte DIR_RF = 0b10 << 0;
public static final byte DIR_RR = 0b11 << 0;
public static final byte DIR_MASK = 0b11 << 0;
- }
-
- public enum Dir {
- FF(0b00 << 0),
- FR(0b01 << 0),
- RF(0b10 << 0),
- RR(0b11 << 0);
- private byte value;
- private Dir(int val) {
- value = (byte) val;
- }
- public byte getValue() {
- return value;
- }
+ public static final byte[] values = {DIR_FF, DIR_FR, DIR_RF, DIR_RR};
}
-
+
public NodeWritable() {
- for (Dir d: Dir.values()) {
- edges[d.getValue()] = new VKmerListWritable();
- threads[d.getValue()] = new PositionListWritable();
+ for (byte d: DirectionFlag.values) {
+ edges[d] = new VKmerListWritable();
+ threads[d] = new PositionListWritable();
}
startReads = new PositionListWritable();
endReads = new PositionListWritable();
@@ -81,30 +70,26 @@
public void set(VKmerListWritable[] edges, PositionListWritable[] threads,
PositionListWritable startReads, PositionListWritable endReads,
VKmerBytesWritable kmer2, float coverage) {
- for (Dir d: Dir.values()) {
- this.edges[d.getValue()].setCopy(edges[d.getValue()]);
- this.threads[d.getValue()].set(threads[d.getValue()]);
+ for (byte d: DirectionFlag.values) {
+ this.edges[d].setCopy(edges[d]);
+ this.threads[d].set(threads[d]);
}
+ this.startReads.set(startReads);
+ this.endReads.set(endReads);
this.kmer.setAsCopy(kmer2);
this.averageCoverage = coverage;
}
public void reset() {
- for (Dir d: Dir.values()) {
- edges[d.getValue()].reset();
- threads[d.getValue()].reset();
+ for (byte d: DirectionFlag.values) {
+ edges[d].reset();
+ threads[d].reset();
}
+ startReads.reset();
+ endReads.reset();
this.kmer.reset(0);
averageCoverage = 0;
}
-
- public PositionListWritable getNodeIdList() {
- return nodeIdList;
- }
-
- public void setNodeIdList(PositionListWritable nodeIdList) {
- this.nodeIdList.set(nodeIdList);
- }
public VKmerBytesWritable getKmer() {
return kmer;
@@ -119,35 +104,35 @@
}
public VKmerListWritable getFFList() {
- return forwardForwardList;
+ return edges[DirectionFlag.DIR_FF];
}
public VKmerListWritable getFRList() {
- return forwardReverseList;
+ return edges[DirectionFlag.DIR_FR];
}
public VKmerListWritable getRFList() {
- return reverseForwardList;
+ return edges[DirectionFlag.DIR_RF];
}
public VKmerListWritable getRRList() {
- return reverseReverseList;
+ return edges[DirectionFlag.DIR_RR];
}
public void setFFList(VKmerListWritable forwardForwardList) {
- this.forwardForwardList.setCopy(forwardForwardList);
+ this.edges[DirectionFlag.DIR_FF].setCopy(forwardForwardList);
}
public void setFRList(VKmerListWritable forwardReverseList) {
- this.forwardReverseList.setCopy(forwardReverseList);
+ this.edges[DirectionFlag.DIR_FR].setCopy(forwardReverseList);
}
public void setRFList(VKmerListWritable reverseForwardList) {
- this.reverseForwardList.setCopy(reverseForwardList);
+ this.edges[DirectionFlag.DIR_RF].setCopy(reverseForwardList);
}
public void setRRList(VKmerListWritable reverseReverseList) {
- this.reverseReverseList.setCopy(reverseReverseList);
+ this.edges[DirectionFlag.DIR_RR].setCopy(reverseReverseList);
}
public VKmerListWritable getListFromDir(byte dir) {
@@ -198,8 +183,13 @@
* Returns the length of the byte-array version of this node
*/
public int getSerializedLength() {
- return nodeIdList.getLength() + forwardForwardList.getLength() + forwardReverseList.getLength() +
- reverseForwardList.getLength() + reverseReverseList.getLength() + kmer.getLength() + SIZE_FLOAT;
+ int length = 0;
+ for (byte d:DirectionFlag.values) {
+ length += edges[d].getLength();
+ length += threads[d].getLength();
+ }
+ length += kmer.getLength() + SIZE_FLOAT;
+ return length;
}
/**
@@ -214,51 +204,53 @@
public void setAsCopy(byte[] data, int offset) {
int curOffset = offset;
- nodeIdList.set(data, curOffset);
-
- curOffset += nodeIdList.getLength();
- forwardForwardList.setCopy(data, curOffset);
- curOffset += forwardForwardList.getLength();
- forwardReverseList.setCopy(data, curOffset);
- curOffset += forwardReverseList.getLength();
- reverseForwardList.setCopy(data, curOffset);
- curOffset += reverseForwardList.getLength();
- reverseReverseList.setCopy(data, curOffset);
-
- curOffset += reverseReverseList.getLength();
+ for (byte d:DirectionFlag.values) {
+ edges[d].setCopy(data, curOffset);
+ curOffset += edges[d].getLength();
+ }
+ for (byte d:DirectionFlag.values) {
+ threads[d].set(data, curOffset);
+ curOffset += threads[d].getLength();
+ }
+ startReads.set(data, curOffset);
+ curOffset += startReads.getLength();
+ endReads.set(data, curOffset);
+ curOffset += endReads.getLength();
kmer.setAsCopy(data, curOffset);
-
curOffset += kmer.getLength();
averageCoverage = Marshal.getFloat(data, curOffset);
}
public void setAsReference(byte[] data, int offset) {
int curOffset = offset;
- nodeIdList.setNewReference(data, curOffset);
+ for (byte d:DirectionFlag.values) {
+ edges[d].setNewReference(data, curOffset);
+ curOffset += edges[d].getLength();
+ }
+ for (byte d:DirectionFlag.values) {
+ threads[d].setNewReference(data, curOffset);
+ curOffset += threads[d].getLength();
+ }
+ startReads.setNewReference(data, curOffset);
+ curOffset += startReads.getLength();
+ endReads.setNewReference(data, curOffset);
+ curOffset += endReads.getLength();
- curOffset += nodeIdList.getLength();
- forwardForwardList.setNewReference(data, curOffset);
- curOffset += forwardForwardList.getLength();
- forwardReverseList.setNewReference(data, curOffset);
- curOffset += forwardReverseList.getLength();
- reverseForwardList.setNewReference(data, curOffset);
- curOffset += reverseForwardList.getLength();
- reverseReverseList.setNewReference(data, curOffset);
-
- curOffset += reverseReverseList.getLength();
- kmer.setAsReference(data, curOffset);
-
+ kmer.setAsReference(data, curOffset);
curOffset += kmer.getLength();
averageCoverage = Marshal.getFloat(data, curOffset);
}
@Override
public void write(DataOutput out) throws IOException {
- this.nodeIdList.write(out);
- this.forwardForwardList.write(out);
- this.forwardReverseList.write(out);
- this.reverseForwardList.write(out);
- this.reverseReverseList.write(out);
+ for (byte d:DirectionFlag.values) {
+ edges[d].write(out);
+ }
+ for (byte d:DirectionFlag.values) {
+ threads[d].write(out);
+ }
+ startReads.write(out);
+ endReads.write(out);
this.kmer.write(out);
out.writeFloat(averageCoverage);
}
@@ -266,11 +258,14 @@
@Override
public void readFields(DataInput in) throws IOException {
reset();
- this.nodeIdList.readFields(in);
- this.forwardForwardList.readFields(in);
- this.forwardReverseList.readFields(in);
- this.reverseForwardList.readFields(in);
- this.reverseReverseList.readFields(in);
+ for (byte d:DirectionFlag.values) {
+ edges[d].readFields(in);
+ }
+ for (byte d:DirectionFlag.values) {
+ threads[d].readFields(in);
+ }
+ startReads.readFields(in);
+ endReads.readFields(in);
this.kmer.readFields(in);
averageCoverage = in.readFloat();
}
@@ -294,49 +289,50 @@
@Override
public boolean equals(Object o) {
- if (o instanceof NodeWritable) {
- NodeWritable nw = (NodeWritable) o;
- return (this.nodeIdList.equals(nw.nodeIdList)
- && this.forwardForwardList.equals(nw.forwardForwardList)
- && this.forwardReverseList.equals(nw.forwardReverseList)
- && this.reverseForwardList.equals(nw.reverseForwardList)
- && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+ if (! (o instanceof NodeWritable))
+ return false;
+
+ NodeWritable nw = (NodeWritable) o;
+ for (byte d:DirectionFlag.values) {
+ if (!edges[d].equals(nw.edges[d]) || !threads[d].equals(nw.threads[d]))
+ return false;
}
- return false;
+ return averageCoverage == nw.averageCoverage && kmer.equals(nw.kmer);
}
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('{');
- sbuilder.append(nodeIdList.toString()).append('\t');
- sbuilder.append(forwardForwardList.toString()).append('\t');
- sbuilder.append(forwardReverseList.toString()).append('\t');
- sbuilder.append(reverseForwardList.toString()).append('\t');
- sbuilder.append(reverseReverseList.toString()).append('\t');
+ for (byte d: DirectionFlag.values) {
+ sbuilder.append(edges[d].toString()).append('\t');
+ }
+ for (byte d: DirectionFlag.values) {
+ sbuilder.append(threads[d].toString()).append('\t');
+ }
sbuilder.append(kmer.toString()).append('\t');
sbuilder.append(averageCoverage).append('x').append('}');
return sbuilder.toString();
}
public void mergeForwardNext(final NodeWritable nextNode, int initialKmerSize) {
- this.forwardForwardList.setCopy(nextNode.forwardForwardList);
- this.forwardReverseList.setCopy(nextNode.forwardReverseList);
+ edges[DirectionFlag.DIR_FF].setCopy(nextNode.edges[DirectionFlag.DIR_FF]);
+ edges[DirectionFlag.DIR_FR].setCopy(nextNode.edges[DirectionFlag.DIR_FR]);
kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
}
public void mergeForwardPre(final NodeWritable preNode, int initialKmerSize) {
- this.reverseForwardList.setCopy(preNode.reverseForwardList);
- this.reverseReverseList.setCopy(preNode.reverseReverseList);
+ edges[DirectionFlag.DIR_RF].setCopy(preNode.edges[DirectionFlag.DIR_RF]);
+ edges[DirectionFlag.DIR_RR].setCopy(preNode.edges[DirectionFlag.DIR_RR]);
kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
}
public int inDegree() {
- return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+ return edges[DirectionFlag.DIR_RR].getCountOfPosition() + edges[DirectionFlag.DIR_RF].getCountOfPosition();
}
public int outDegree() {
- return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+ return edges[DirectionFlag.DIR_FF].getCountOfPosition() + edges[DirectionFlag.DIR_FR].getCountOfPosition();
}
/*
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
index 94c8ec1..eb367a1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/AggregateKmerAggregateFactory.java
@@ -17,8 +17,10 @@
import java.io.DataOutput;
import java.io.IOException;
+
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -77,10 +79,9 @@
localUniNode.reset();
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
- localUniNode.getFFList().appendList(readNode.getFFList());
- localUniNode.getFRList().appendList(readNode.getFRList());
- localUniNode.getRFList().appendList(readNode.getRFList());
- localUniNode.getRRList().appendList(readNode.getRRList());
+ for (byte d: DirectionFlag.values) {
+ localUniNode.getListFromDir(d).appendList(readNode.getListFromDir(d));
+ }
localUniNode.addCoverage(readNode);
// make an empty field
// tupleBuilder.addFieldEndOffset();// mark question?
@@ -92,10 +93,9 @@
NodeWritable localUniNode = (NodeWritable) state.state;
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().appendList(readNode.getNodeIdList());
- localUniNode.getFFList().appendList(readNode.getFFList());
- localUniNode.getFRList().appendList(readNode.getFRList());
- localUniNode.getRFList().appendList(readNode.getRFList());
- localUniNode.getRRList().appendList(readNode.getRRList());
+ for (byte d: DirectionFlag.values) {
+ localUniNode.getListFromDir(d).appendList(readNode.getListFromDir(d));
+ }
localUniNode.addCoverage(readNode);
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
index f83bc4b..89a0e35 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/aggregators/MergeKmerAggregateFactory.java
@@ -23,6 +23,7 @@
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -71,10 +72,9 @@
localUniNode.reset();
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
- localUniNode.getFFList().unionUpdate(readNode.getFFList());
- localUniNode.getFRList().unionUpdate(readNode.getFRList());
- localUniNode.getRFList().unionUpdate(readNode.getRFList());
- localUniNode.getRRList().unionUpdate(readNode.getRRList());
+ for (byte d: DirectionFlag.values) {
+ localUniNode.getListFromDir(d).unionUpdate(readNode.getListFromDir(d));
+ }
localUniNode.addCoverage(readNode);
//make a fake feild to cheat caller
// tupleBuilder.addFieldEndOffset();
@@ -91,10 +91,9 @@
NodeWritable localUniNode = (NodeWritable) state.state;
readNode.setAsReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
localUniNode.getNodeIdList().unionUpdate(readNode.getNodeIdList());
- localUniNode.getFFList().unionUpdate(readNode.getFFList());
- localUniNode.getFRList().unionUpdate(readNode.getFRList());
- localUniNode.getRFList().unionUpdate(readNode.getRFList());
- localUniNode.getRRList().unionUpdate(readNode.getRRList());
+ for (byte d: DirectionFlag.values) {
+ localUniNode.getListFromDir(d).unionUpdate(readNode.getListFromDir(d));
+ }
localUniNode.addCoverage(readNode);
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
index edb305f..5943f0a 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/format/InitialGraphCleanInputFormat.java
@@ -12,6 +12,7 @@
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.genomix.type.NodeWritable;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable.DirectionFlag;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;