Merge branch 'genomix/fullstack_genomix' into nanzhang/hyracks_genomix
Conflicts:
genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
similarity index 97%
rename from genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
rename to genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
index 08cfa9d..220f45c 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/IntermediateNodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/IntermediateNodeWritable.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.genomix.type;
+package edu.uci.ics.genomix.oldtype;
import java.io.DataInput;
import java.io.DataOutput;
@@ -7,6 +7,9 @@
import org.apache.hadoop.io.WritableComparable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
private static final long serialVersionUID = 1L;
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
index 3931e7f..9fc1829 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/oldtype/NodeWritable.java
@@ -211,13 +211,13 @@
@Override
public String toString() {
StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
+ sbuilder.append('{');
sbuilder.append(nodeID.toString()).append('\t');
sbuilder.append(forwardForwardList.toString()).append('\t');
sbuilder.append(forwardReverseList.toString()).append('\t');
sbuilder.append(reverseForwardList.toString()).append('\t');
sbuilder.append(reverseReverseList.toString()).append('\t');
- sbuilder.append(kmer.toString()).append(')');
+ sbuilder.append(kmer.toString()).append('}');
return sbuilder.toString();
}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
index 0f91a7c..eb0bd59 100644
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/KmerListWritable.java
@@ -17,7 +17,7 @@
protected byte[] storage;
protected int offset;
protected int valueCount;
- public int kmerByteSize = 0; //default kmerSize = 5, kmerByteSize = 2, fix length once setting
+ public int kmerByteSize = 0;
public int kmerlength = 0;
protected static final byte[] EMPTY = {};
@@ -29,13 +29,10 @@
this.offset = 0;
}
- public KmerListWritable(int kmerSize) {
+ public KmerListWritable(int kmerlength) {
this();
- this.kmerByteSize = KmerUtil.getByteNumFromK(kmerSize);;
- }
-
- public KmerListWritable(int count, byte[] data, int offset) {
- setNewReference(count, data, offset);
+ this.kmerlength = kmerlength;
+ this.kmerByteSize = KmerUtil.getByteNumFromK(kmerlength);
}
public KmerListWritable(int kmerlength, int count, byte[] data, int offset) {
@@ -66,6 +63,20 @@
valueCount += 1;
}
+ /*
+ * Append the otherList to the end of myList
+ */
+ public void appendList(KmerListWritable otherList) {
+ if (otherList.valueCount > 0) {
+ setSize((valueCount + otherList.valueCount) * kmerByteSize);
+ // copy contents of otherList into the end of my storage
+ System.arraycopy(otherList.storage, otherList.offset,
+ storage, offset + valueCount * kmerByteSize,
+ otherList.valueCount * kmerByteSize);
+ valueCount += otherList.valueCount;
+ }
+ }
+
protected void setSize(int size) {
if (size > getCapacity()) {
setCapacity((size * 3 / 2));
@@ -87,7 +98,9 @@
}
}
- public void reset() {
+ public void reset(int kmerSize) {
+ kmerlength = kmerSize;
+ kmerByteSize = KmerUtil.getByteNumFromK(kmerlength);
storage = EMPTY;
valueCount = 0;
offset = 0;
@@ -177,10 +190,6 @@
public String toString() {
StringBuilder sbuilder = new StringBuilder();
sbuilder.append('[');
-// for (KmerBytesWritable kmer : this) {
-// sbuilder.append(kmer.toString());
-// sbuilder.append(',');
-// }
for(int i = 0; i < valueCount; i++){
sbuilder.append(getPosition(i).toString());
sbuilder.append(',');
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
new file mode 100644
index 0000000..4725e30
--- /dev/null
+++ b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/NodeWritable.java
@@ -0,0 +1,242 @@
+package edu.uci.ics.genomix.type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+
+
+public class NodeWritable implements WritableComparable<NodeWritable>, Serializable{
+
+// public static class KMER{
+// public static final byte EXIST = 0;
+// public static final byte NON_EXIST = 1;
+// }
+
+ private static final long serialVersionUID = 1L;
+ public static final NodeWritable EMPTY_NODE = new NodeWritable(0);
+
+ private PositionListWritable nodeIdList;
+ private KmerListWritable forwardForwardList;
+ private KmerListWritable forwardReverseList;
+ private KmerListWritable reverseForwardList;
+ private KmerListWritable reverseReverseList;
+ private KmerBytesWritable kmer;
+// private byte kmerMark;
+
+ // merge/update directions
+ public static class DirectionFlag {
+ public static final byte DIR_FF = 0b00 << 0;
+ public static final byte DIR_FR = 0b01 << 0;
+ public static final byte DIR_RF = 0b10 << 0;
+ public static final byte DIR_RR = 0b11 << 0;
+ public static final byte DIR_MASK = 0b11 << 0;
+ }
+
+ public NodeWritable() {
+ this(0);
+ }
+
+ public NodeWritable(int kmerSize) {
+ nodeIdList = new PositionListWritable();
+ forwardForwardList = new KmerListWritable(kmerSize);
+ forwardReverseList = new KmerListWritable(kmerSize);
+ reverseForwardList = new KmerListWritable(kmerSize);
+ reverseReverseList = new KmerListWritable(kmerSize);
+ kmer = new KmerBytesWritable(kmerSize);
+// kmerMark = KMER.NON_EXIST;
+ }
+
+ public NodeWritable(PositionListWritable nodeIdList, KmerListWritable FFList, KmerListWritable FRList,
+ KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
+ this(kmer.getKmerLength());
+ set(nodeIdList, FFList, FRList, RFList, RRList, kmer);
+ }
+
+ public void set(NodeWritable node){
+ set(node.nodeIdList, node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
+ node.reverseReverseList, node.kmer);
+ }
+
+ public void set(PositionListWritable nodeIdList, KmerListWritable FFList, KmerListWritable FRList,
+ KmerListWritable RFList, KmerListWritable RRList, KmerBytesWritable kmer) {
+ this.nodeIdList.set(nodeIdList);
+ this.forwardForwardList.set(FFList);
+ this.forwardReverseList.set(FRList);
+ this.reverseForwardList.set(RFList);
+ this.reverseReverseList.set(RRList);
+ this.kmer.set(kmer);
+// kmerMark = KMER.EXIST;
+ }
+
+ public void reset(int kmerSize) {
+ nodeIdList.reset();
+ forwardForwardList.reset(kmerSize);
+ forwardReverseList.reset(kmerSize);
+ reverseForwardList.reset(kmerSize);
+ reverseReverseList.reset(kmerSize);
+ kmer.reset(kmerSize);
+// kmerMark = KMER.NON_EXIST;
+ }
+
+
+ public PositionListWritable getNodeIdList() {
+ return nodeIdList;
+ }
+
+ public void setNodeIdList(PositionListWritable nodeIdList) {
+ this.nodeIdList.set(nodeIdList);
+ }
+
+ public KmerBytesWritable getKmer() {
+ return kmer;
+ }
+
+ public void setKmer(KmerBytesWritable kmer) {
+// kmerMark = KMER.EXIST;
+ this.kmer.set(kmer);
+ }
+
+ public int getCount() {
+ return kmer.getKmerLength();
+ }
+
+ public KmerListWritable getFFList() {
+ return forwardForwardList;
+ }
+
+ public KmerListWritable getFRList() {
+ return forwardReverseList;
+ }
+
+ public KmerListWritable getRFList() {
+ return reverseForwardList;
+ }
+
+ public KmerListWritable getRRList() {
+ return reverseReverseList;
+ }
+
+ public void setFFList(KmerListWritable forwardForwardList) {
+ this.forwardForwardList.set(forwardForwardList);
+ }
+
+ public void setFRList(KmerListWritable forwardReverseList) {
+ this.forwardReverseList.set(forwardReverseList);
+ }
+
+ public void setRFList(KmerListWritable reverseForwardList) {
+ this.reverseForwardList.set(reverseForwardList);
+ }
+
+ public void setRRList(KmerListWritable reverseReverseList) {
+ this.reverseReverseList.set(reverseReverseList);
+ }
+
+ public KmerListWritable getListFromDir(byte dir) {
+ switch (dir & DirectionFlag.DIR_MASK) {
+ case DirectionFlag.DIR_FF:
+ return getFFList();
+ case DirectionFlag.DIR_FR:
+ return getFRList();
+ case DirectionFlag.DIR_RF:
+ return getRFList();
+ case DirectionFlag.DIR_RR:
+ return getRRList();
+ default:
+ throw new RuntimeException("Unrecognized direction in getListFromDir: " + dir);
+ }
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+// out.writeByte(kmerMark);
+// this.nodeIdList.write(out);
+ this.forwardForwardList.write(out);
+ this.forwardReverseList.write(out);
+// this.reverseForwardList.write(out);
+// this.reverseReverseList.write(out);
+// if(kmerMark == KMER.EXIST)
+// this.kmer.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+// kmerMark = in.readByte();
+// this.nodeIdList.readFields(in);
+ this.forwardForwardList.readFields(in);
+ this.forwardReverseList.readFields(in);
+// this.reverseForwardList.readFields(in);
+// this.reverseReverseList.readFields(in);
+// if(kmerMark == KMER.EXIST)
+// this.kmer.readFields(in);
+ }
+
+ @Override
+ public int compareTo(NodeWritable other) {
+ return this.kmer.compareTo(other.kmer);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.kmer.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof NodeWritable) {
+ NodeWritable nw = (NodeWritable) o;
+ return (this.nodeIdList.equals(nw.nodeIdList)
+ && this.forwardForwardList.equals(nw.forwardForwardList)
+ && this.forwardReverseList.equals(nw.forwardReverseList)
+ && this.reverseForwardList.equals(nw.reverseForwardList)
+ && this.reverseReverseList.equals(nw.reverseReverseList) && this.kmer.equals(nw.kmer));
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sbuilder = new StringBuilder();
+ sbuilder.append('(');
+ sbuilder.append(nodeIdList.toString()).append('\t');
+ sbuilder.append(forwardForwardList.toString()).append('\t');
+ sbuilder.append(forwardReverseList.toString()).append('\t');
+ sbuilder.append(reverseForwardList.toString()).append('\t');
+ sbuilder.append(reverseReverseList.toString()).append('\t');
+ sbuilder.append(kmer.toString()).append(')');
+ return sbuilder.toString();
+ }
+
+ public void mergeForwardNext(NodeWritable nextNode, int initialKmerSize) {
+ this.forwardForwardList.set(nextNode.forwardForwardList);
+ this.forwardReverseList.set(nextNode.forwardReverseList);
+ kmer.mergeWithFFKmer(initialKmerSize, nextNode.getKmer());
+ }
+
+ public void mergeForwardPre(NodeWritable preNode, int initialKmerSize) {
+ this.reverseForwardList.set(preNode.reverseForwardList);
+ this.reverseReverseList.set(preNode.reverseReverseList);
+ kmer.mergeWithRRKmer(initialKmerSize, preNode.getKmer());
+ }
+
+ public int inDegree() {
+ return reverseReverseList.getCountOfPosition() + reverseForwardList.getCountOfPosition();
+ }
+
+ public int outDegree() {
+ return forwardForwardList.getCountOfPosition() + forwardReverseList.getCountOfPosition();
+ }
+
+ /*
+ * Return if this node is a "path" compressible node, that is, it has an in-degree and out-degree of 1
+ */
+ public boolean isPathNode() {
+ return inDegree() == 1 && outDegree() == 1;
+ }
+
+ public boolean isSimpleOrTerminalPath() {
+ return isPathNode() || (inDegree() == 0 && outDegree() == 1) || (inDegree() == 1 && outDegree() == 0);
+ }
+}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
deleted file mode 100644
index 7afc1ef..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDListWritable.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.hadoop.io.Writable;
-import edu.uci.ics.genomix.data.Marshal;
-public class ReadIDListWritable implements Writable, Iterable<ReadIDWritable>, Serializable{
- private static final long serialVersionUID = 1L;
- protected byte[] storage;
- protected int offset;
- protected int valueCount;
- protected static final byte[] EMPTY = {};
-
- protected ReadIDWritable posIter = new ReadIDWritable();
-
- public ReadIDListWritable() {
- this.storage = EMPTY;
- this.valueCount = 0;
- this.offset = 0;
- }
-
- public ReadIDListWritable(int count, byte[] data, int offset) {
- setNewReference(count, data, offset);
- }
-
- public ReadIDListWritable(List<ReadIDWritable> posns) {
- this();
- setSize(posns.size()); // reserve space for all elements
- for (ReadIDWritable p : posns) {
- append(p);
- }
- }
-
- public void setNewReference(int count, byte[] data, int offset) {
- this.valueCount = count;
- this.storage = data;
- this.offset = offset;
- }
-
- public void append(long readIDByte) {
- setSize((1 + valueCount) * ReadIDWritable.LENGTH);
- Marshal.putLong(readIDByte, storage, offset + valueCount * ReadIDWritable.LENGTH);
- valueCount += 1;
- }
-
- public void append(byte mateId, long readId){
- append(ReadIDWritable.serialize(mateId, readId));
- }
-
- public void append(ReadIDWritable pos) {
- if(pos != null)
- append(pos.deserialize());
- else
- throw new RuntimeException("This position is null pointer!");
- }
-
- /*
- * Append the otherList to the end of myList
- */
- public void appendList(ReadIDListWritable otherList) {
- if (otherList.valueCount > 0) {
- setSize((valueCount + otherList.valueCount) * ReadIDWritable.LENGTH);
- // copy contents of otherList into the end of my storage
- System.arraycopy(otherList.storage, otherList.offset,
- storage, offset + valueCount * ReadIDWritable.LENGTH,
- otherList.valueCount * ReadIDWritable.LENGTH);
- valueCount += otherList.valueCount;
- }
- }
-
- public static int getCountByDataLength(int length) {
- if (length % ReadIDWritable.LENGTH != 0) {
- throw new IllegalArgumentException("Length of positionlist is invalid");
- }
- return length / ReadIDWritable.LENGTH;
- }
-
- public void set(ReadIDListWritable otherList) {
- set(otherList.valueCount, otherList.storage, otherList.offset);
- }
-
- public void set(int valueCount, byte[] newData, int offset) {
- this.valueCount = valueCount;
- setSize(valueCount * ReadIDWritable.LENGTH);
- if (valueCount > 0) {
- System.arraycopy(newData, offset, storage, this.offset, valueCount * ReadIDWritable.LENGTH);
- }
- }
-
- public void reset() {
- valueCount = 0;
- }
-
- protected void setSize(int size) {
- if (size > getCapacity()) {
- setCapacity((size * 3 / 2));
- }
- }
-
- protected int getCapacity() {
- return storage.length - offset;
- }
-
- protected void setCapacity(int new_cap) {
- if (new_cap > getCapacity()) {
- byte[] new_data = new byte[new_cap];
- if (storage.length - offset > 0) {
- System.arraycopy(storage, offset, new_data, 0, storage.length - offset);
- }
- storage = new_data;
- offset = 0;
- }
- }
-
- public ReadIDWritable getPosition(int i) {
- if (i >= valueCount) {
- throw new ArrayIndexOutOfBoundsException("No such positions");
- }
- posIter.setNewReference(storage, offset + i * ReadIDWritable.LENGTH);
- return posIter;
- }
-
- public void resetPosition(int i, long readIDByte) {
- if (i >= valueCount) {
- throw new ArrayIndexOutOfBoundsException("No such positions");
- }
- Marshal.putLong(readIDByte, storage, offset + i * ReadIDWritable.LENGTH);
- }
-
- public int getCountOfPosition() {
- return valueCount;
- }
-
- public byte[] getByteArray() {
- return storage;
- }
-
- public int getStartOffset() {
- return offset;
- }
-
- public int getLength() {
- return valueCount * ReadIDWritable.LENGTH;
- }
-
- @Override
- public Iterator<ReadIDWritable> iterator() {
- Iterator<ReadIDWritable> it = new Iterator<ReadIDWritable>() {
-
- private int currentIndex = 0;
-
- @Override
- public boolean hasNext() {
- return currentIndex < valueCount;
- }
-
- @Override
- public ReadIDWritable next() {
- return getPosition(currentIndex++);
- }
-
- @Override
- public void remove() {
- if(currentIndex < valueCount)
- System.arraycopy(storage, offset + currentIndex * ReadIDWritable.LENGTH,
- storage, offset + (currentIndex - 1) * ReadIDWritable.LENGTH,
- (valueCount - currentIndex) * ReadIDWritable.LENGTH);
- valueCount--;
- currentIndex--;
- }
- };
- return it;
- }
-
- /*
- * remove the first instance of @toRemove. Uses a linear scan. Throws an exception if not in this list.
- */
- public void remove(ReadIDWritable toRemove, boolean ignoreMissing) {
- Iterator<ReadIDWritable> posIterator = this.iterator();
- while (posIterator.hasNext()) {
- if(toRemove.equals(posIterator.next())) {
- posIterator.remove();
- return;
- }
- }
- if (!ignoreMissing) {
- throw new ArrayIndexOutOfBoundsException("the PositionWritable `" + toRemove.toString() + "` was not found in this list.");
- }
- }
-
- public void remove(ReadIDWritable toRemove) {
- remove(toRemove, false);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(valueCount);
- out.write(storage, offset, valueCount * ReadIDWritable.LENGTH);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.valueCount = in.readInt();
- setSize(valueCount * ReadIDWritable.LENGTH);
- in.readFully(storage, offset, valueCount * ReadIDWritable.LENGTH);
- }
-
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('[');
- for (ReadIDWritable pos : this) {
- sbuilder.append(pos.toString());
- sbuilder.append(',');
- }
- if (valueCount > 0) {
- sbuilder.setCharAt(sbuilder.length() - 1, ']');
- } else {
- sbuilder.append(']');
- }
- return sbuilder.toString();
- }
-
- @Override
- public int hashCode() {
- return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof PositionListWritable))
- return false;
- PositionListWritable other = (PositionListWritable) o;
- if (this.valueCount != other.valueCount)
- return false;
- for (int i=0; i < this.valueCount; i++) {
- if (!this.getPosition(i).equals(other.getPosition(i)))
- return false;
- }
- return true;
- }
-}
diff --git a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java b/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
deleted file mode 100644
index b6c829a..0000000
--- a/genomix/genomix-data/src/main/java/edu/uci/ics/genomix/type/ReadIDWritable.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package edu.uci.ics.genomix.type;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.hadoop.io.WritableComparable;
-import edu.uci.ics.genomix.data.Marshal;
-
-public class ReadIDWritable implements WritableComparable<ReadIDWritable>, Serializable{
- private static final long serialVersionUID = 1L;
- protected byte[] storage;
- protected int offset;
- public static final int LENGTH = 6;
-
- public static final int totalBits = 48;
- private static final int bitsForMate = 1;
- private static final int readIdShift = bitsForMate;
-
- public ReadIDWritable() {
- storage = new byte[LENGTH];
- offset = 0;
- }
-
- public ReadIDWritable(byte mateId, long readId){
- this();
- set(mateId, readId);
- }
-
- public ReadIDWritable(byte[] storage, int offset) {
- setNewReference(storage, offset);
- }
-
- public void set(long readIDByte){
- Marshal.putLong(readIDByte, storage, offset);
- }
-
- public static long serialize(byte mateId, long readId) {
- return (readId << 1) + (mateId & 0b1);
- }
-
- public void set(byte mateId, long readId){
- Marshal.putLong(serialize(mateId, readId), storage, offset);
- }
-
- public void set(ReadIDWritable pos) {
- set(pos.getMateId(),pos.getReadId());
- }
-
- public void setNewReference(byte[] storage, int offset) {
- this.storage = storage;
- this.offset = offset;
- }
-
- public void reset(){
- storage = new byte[LENGTH];
- offset = 0;
- }
-
- public long deserialize(){
- return Marshal.getLong(storage, offset);
- }
-
- public byte getMateId(){
- return (byte) (Marshal.getLong(storage, offset) & 0b1);
- }
-
- public long getReadId(){
- return Marshal.getLong(storage, offset) >>> readIdShift;
- }
-
- public byte[] getByteArray() {
- return storage;
- }
-
- public int getStartOffset() {
- return offset;
- }
-
- public int getLength() {
- return LENGTH;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- in.readFully(storage, offset, LENGTH);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.write(storage, offset, LENGTH);
- }
-
- @Override
- public int hashCode() {
- return Marshal.hashBytes(getByteArray(), getStartOffset(), getLength());
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof ReadIDWritable))
- return false;
- ReadIDWritable other = (ReadIDWritable) o;
- return this.deserialize() == other.deserialize();
- }
-
- @Override
- public int compareTo(ReadIDWritable other) {
- return (this.deserialize() < other.deserialize()) ? -1 : ((this.deserialize() == other.deserialize()) ? 0 : 1);
- }
-
- /*
- * String of form "(readId-posID_mate)" where mate is _1 or _2
- */
- @Override
- public String toString() {
- return "(" + this.getReadId() + "_" + (this.getMateId() + 1) + ")";
- }
-}
diff --git a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
index c31ca6d..71246a1 100644
--- a/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
+++ b/genomix/genomix-data/src/test/java/edu/uci/ics/genomix/data/test/KmerListWritableTest.java
@@ -24,13 +24,13 @@
String randomString = generateString(i);
byte[] array = randomString.getBytes();
kmer.setByRead(array, 0);
- kmerList.reset();
+ kmerList.reset(kmer.getKmerLength());
kmerList.append(kmer);
Assert.assertEquals(kmerList.getPosition(0).toString(), randomString);
Assert.assertEquals(1, kmerList.getCountOfPosition());
}
- kmerList.reset();
+ kmerList.reset(kmer.getKmerLength());
//add one more kmer each time and fix kmerSize
for (int i = 0; i < 200; i++) {
kmer = new KmerBytesWritable(5);
diff --git a/genomix/genomix-hadoop/data/webmap/test.txt b/genomix/genomix-hadoop/data/webmap/test.txt
new file mode 100644
index 0000000..17770fa
--- /dev/null
+++ b/genomix/genomix-hadoop/data/webmap/test.txt
@@ -0,0 +1 @@
+1 AATAGAAG
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
new file mode 100644
index 0000000..3723ed9
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixDriver.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+
+@SuppressWarnings("deprecation")
+public class GenomixDriver {
+
+ private static class Options {
+ @Option(name = "-inputpath", usage = "the input path", required = true)
+ public String inputPath;
+
+ @Option(name = "-outputpath", usage = "the output path", required = true)
+ public String outputPath;
+
+ @Option(name = "-num-reducers", usage = "the number of reducers", required = true)
+ public int numReducers;
+
+ @Option(name = "-kmer-size", usage = "the size of kmer", required = true)
+ public int sizeKmer;
+
+ @Option(name = "-read-length", usage = "the length of read", required = true)
+ public int readLength;
+ }
+
+ public void run(String inputPath, String outputPath, int numReducers, int sizeKmer, int readLength,
+ boolean seqOutput, String defaultConfPath) throws IOException{
+ JobConf conf = new JobConf(GenomixDriver.class);
+ conf.setInt("sizeKmer", sizeKmer);
+ conf.setInt("readLength", readLength);
+ if (defaultConfPath != null) {
+ conf.addResource(new Path(defaultConfPath));
+ }
+
+ conf.setJobName("Genomix Graph Building");
+ conf.setMapperClass(GenomixMapper.class);
+ conf.setReducerClass(GenomixReducer.class);
+
+ conf.setMapOutputKeyClass(KmerBytesWritable.class);
+ conf.setMapOutputValueClass(NodeWritable.class);
+
+ //InputFormat and OutputFormat for Reducer
+ conf.setInputFormat(TextInputFormat.class);
+ if (seqOutput == true)
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ else
+ conf.setOutputFormat(TextOutputFormat.class);
+
+ //Output Key/Value Class
+ conf.setOutputKeyClass(KmerBytesWritable.class);
+ conf.setOutputValueClass(NodeWritable.class);
+
+ FileInputFormat.setInputPaths(conf, new Path(inputPath));
+ FileOutputFormat.setOutputPath(conf, new Path(outputPath));
+ conf.setNumReduceTasks(numReducers);
+
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(outputPath), true);
+ JobClient.runJob(conf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+ GenomixDriver driver = new GenomixDriver();
+ driver.run(options.inputPath, options.outputPath, options.numReducers, options.sizeKmer,
+ options.readLength, true, null);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
new file mode 100644
index 0000000..3b615cb
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixMapper.java
@@ -0,0 +1,242 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
+import edu.uci.ics.genomix.type.PositionWritable;
+
+@SuppressWarnings("deprecation")
+public class GenomixMapper extends MapReduceBase implements
+ Mapper<LongWritable, Text, KmerBytesWritable, NodeWritable>{
+
+ public static enum KmerDir{
+ FORWARD,
+ REVERSE,
+ }
+
+ public static int KMER_SIZE;
+ private KmerBytesWritable preForwardKmer;
+ private KmerBytesWritable preReverseKmer;
+ private KmerBytesWritable curForwardKmer;
+ private KmerBytesWritable curReverseKmer;
+ private KmerBytesWritable nextForwardKmer;
+ private KmerBytesWritable nextReverseKmer;
+ private PositionWritable nodeId;
+ private PositionListWritable nodeIdList;
+ private KmerListWritable edgeListForPreKmer;
+ private KmerListWritable edgeListForNextKmer;
+ private NodeWritable outputNode;
+
+ private KmerDir preKmerDir;
+ private KmerDir curKmerDir;
+ private KmerDir nextKmerDir;
+
+ byte mateId = (byte)0;
+
+ @Override
+ public void configure(JobConf job) {
+ KMER_SIZE = Integer.parseInt(job.get("sizeKmer"));
+ preForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ preReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ curForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ curReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ nextForwardKmer = new KmerBytesWritable(KMER_SIZE);
+ nextReverseKmer = new KmerBytesWritable(KMER_SIZE);
+ nodeId = new PositionWritable();
+ nodeIdList = new PositionListWritable();
+ edgeListForPreKmer = new KmerListWritable(KMER_SIZE);
+ edgeListForNextKmer = new KmerListWritable(KMER_SIZE);
+ outputNode = new NodeWritable(KMER_SIZE);
+ preKmerDir = KmerDir.FORWARD;
+ curKmerDir = KmerDir.FORWARD;
+ nextKmerDir = KmerDir.FORWARD;
+ }
+
+ @Override
+ public void map(LongWritable key, Text value, OutputCollector<KmerBytesWritable, NodeWritable> output,
+ Reporter reporter) throws IOException {
+ /** first kmer */
+ String[] rawLine = value.toString().split("\\t"); // Read the Real Gene Line
+ if (rawLine.length != 2) {
+ throw new IOException("invalid data");
+ }
+ int readID = 0;
+ readID = Integer.parseInt(rawLine[0]);
+ String geneLine = rawLine[1];
+ Pattern genePattern = Pattern.compile("[AGCT]+");
+ Matcher geneMatcher = genePattern.matcher(geneLine);
+ boolean isValid = geneMatcher.matches();
+ if (isValid == true) {
+ byte[] array = geneLine.getBytes();
+ if (KMER_SIZE >= array.length) {
+ throw new IOException("short read");
+ }
+
+ /** first kmer **/
+ outputNode.reset(KMER_SIZE);
+ curForwardKmer.setByRead(array, 0);
+ curReverseKmer.setByReadReverse(array, 0);
+ curKmerDir = curForwardKmer.compareTo(curReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ setNextKmer(array[KMER_SIZE]);
+ //set value.nodeId
+ setNodeId(mateId, readID, 1);
+ //set value.edgeList
+ setEdgeListForNextKmer();
+ //output mapper result
+ setMapperOutput(output);
+
+ /** middle kmer **/
+ for (int i = KMER_SIZE + 1; i < array.length; i++) {
+ outputNode.reset(KMER_SIZE);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ setNextKmer(array[i]);
+ //set value.nodeId
+ setNodeId(mateId, readID, i - KMER_SIZE + 1);
+ //set value.edgeList
+ setEdgeListForPreKmer();
+ setEdgeListForNextKmer();
+ //output mapper result
+ setMapperOutput(output);
+ }
+
+ /** last kmer **/
+ outputNode.reset(KMER_SIZE);
+ setPreKmerByOldCurKmer();
+ setCurKmerByOldNextKmer();
+ //set value.nodeId
+ setNodeId(mateId, readID, array.length - KMER_SIZE + 1);
+ //set value.edgeList
+ setEdgeListForPreKmer();
+ //output mapper result
+ setMapperOutput(output);
+ }
+ }
+
+ public void setNodeId(byte mateId, long readID, int posId){
+ nodeId.set(mateId, readID, posId);
+ nodeIdList.reset();
+ nodeIdList.append(nodeId);
+ outputNode.setNodeIdList(nodeIdList);
+ }
+
+ public void setEdgeListForPreKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(KMER_SIZE);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setRRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(KMER_SIZE);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setRFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(preKmerDir){
+ case FORWARD:
+ edgeListForPreKmer.reset(KMER_SIZE);
+ edgeListForPreKmer.append(preForwardKmer);
+ outputNode.setFRList(edgeListForPreKmer);
+ break;
+ case REVERSE:
+ edgeListForPreKmer.reset(KMER_SIZE);
+ edgeListForPreKmer.append(preReverseKmer);
+ outputNode.setFFList(edgeListForPreKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ public void setEdgeListForNextKmer(){
+ switch(curKmerDir){
+ case FORWARD:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(KMER_SIZE);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setFFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(KMER_SIZE);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setFRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ case REVERSE:
+ switch(nextKmerDir){
+ case FORWARD:
+ edgeListForNextKmer.reset(KMER_SIZE);
+ edgeListForNextKmer.append(nextForwardKmer);
+ outputNode.setRFList(edgeListForNextKmer);
+ break;
+ case REVERSE:
+ edgeListForNextKmer.reset(KMER_SIZE);
+ edgeListForNextKmer.append(nextReverseKmer);
+ outputNode.setRRList(edgeListForNextKmer);
+ break;
+ }
+ break;
+ }
+ }
+
+ //set preKmer by shifting curKmer with preChar
+ public void setPreKmer(byte preChar){
+ preForwardKmer.set(curForwardKmer);
+ preForwardKmer.shiftKmerWithPreChar(preChar);
+ preReverseKmer.setByReadReverse(preForwardKmer.toString().getBytes(), preForwardKmer.getOffset());
+ preKmerDir = preForwardKmer.compareTo(preReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ //set nextKmer by shifting curKmer with nextChar
+ public void setNextKmer(byte nextChar){
+ nextForwardKmer.set(curForwardKmer);
+ nextForwardKmer.shiftKmerWithNextChar(nextChar);
+ nextReverseKmer.setByReadReverse(nextForwardKmer.toString().getBytes(), nextForwardKmer.getOffset());
+ nextKmerDir = nextForwardKmer.compareTo(nextReverseKmer) <= 0 ? KmerDir.FORWARD : KmerDir.REVERSE;
+ }
+
+ //old curKmer becomes current preKmer
+ public void setPreKmerByOldCurKmer(){
+ preKmerDir = curKmerDir;
+ preForwardKmer.set(curForwardKmer);
+ preReverseKmer.set(curReverseKmer);
+ }
+
+ //old nextKmer becomes current curKmer
+ public void setCurKmerByOldNextKmer(){
+ curKmerDir = nextKmerDir;
+ curForwardKmer.set(nextForwardKmer);
+ preReverseKmer.set(nextReverseKmer);
+ }
+
+ public void setMapperOutput(OutputCollector<KmerBytesWritable, NodeWritable> output) throws IOException{
+ switch(curKmerDir){
+ case FORWARD:
+ output.collect(curForwardKmer, outputNode);
+ break;
+ case REVERSE:
+ output.collect(curReverseKmer, outputNode);
+ break;
+ }
+ }
+}
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
new file mode 100644
index 0000000..6472f05
--- /dev/null
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GenomixReducer.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+
+@SuppressWarnings("deprecation")
+public class GenomixReducer extends MapReduceBase implements
+ Reducer<KmerBytesWritable, NodeWritable, KmerBytesWritable, NodeWritable>{
+
+ private NodeWritable outputNode = new NodeWritable();
+ private NodeWritable tmpNode = new NodeWritable();
+ @Override
+ public void reduce(KmerBytesWritable key, Iterator<NodeWritable> values,
+ OutputCollector<KmerBytesWritable, NodeWritable> output,
+ Reporter reporter) throws IOException {
+ outputNode.reset(0);
+
+// //copy first item to outputNode
+// if(values.hasNext()){
+// NodeWritable tmpNode = values.next();
+// outputNode.set(tmpNode);
+// }
+ while (values.hasNext()) {
+ tmpNode.set(values.next());
+ outputNode.getNodeIdList().appendList(tmpNode.getNodeIdList());
+ outputNode.getFFList().appendList(tmpNode.getFFList());
+ outputNode.getFRList().appendList(tmpNode.getFRList());
+ outputNode.getRFList().appendList(tmpNode.getRFList());
+ outputNode.getRRList().appendList(tmpNode.getRRList());
+ }
+ output.collect(key,outputNode);
+ }
+
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
new file mode 100644
index 0000000..4716072
--- /dev/null
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/contrailgraphbuilding/GraphBuildingTest.java
@@ -0,0 +1,82 @@
+package edu.uci.ics.genomix.hadoop.contrailgraphbuilding;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.junit.Test;
+
+import edu.uci.ics.genomix.hadoop.pmcommon.HadoopMiniClusterTest;
+
+@SuppressWarnings("deprecation")
+public class GraphBuildingTest {
+
+ private JobConf conf = new JobConf();
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private static final String DATA_PATH = "data/webmap/test.txt";
+ private static final String HDFS_PATH = "/webmap";
+ private static final String RESULT_PATH = "/result";
+
+// private static final int COUNT_REDUCER = 2;
+ private static final int SIZE_KMER = 5;
+ private static final int READ_LENGTH = 8;
+
+ private MiniDFSCluster dfsCluster;
+ private MiniMRCluster mrCluster;
+ private FileSystem dfs;
+
+ @Test
+ public void test() throws Exception {
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHadoop();
+ TestMapKmerToNode();
+ cleanupHadoop();
+ }
+
+ public void TestMapKmerToNode() throws Exception {
+ GenomixDriver driver = new GenomixDriver();
+ driver.run(HDFS_PATH, RESULT_PATH, 0, SIZE_KMER, READ_LENGTH, true, HADOOP_CONF_PATH);
+ dumpResult();
+ }
+
+ private void startHadoop() throws IOException {
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, 1, true, null);
+ dfs = dfsCluster.getFileSystem();
+ mrCluster = new MiniMRCluster(1, dfs.getUri().toString(), 1);
+
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH + "/");
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
+
+ private void cleanupHadoop() throws IOException {
+ mrCluster.shutdown();
+ dfsCluster.shutdown();
+ }
+
+ private void dumpResult() throws IOException {
+ Path src = new Path(RESULT_PATH);
+ Path dest = new Path(ACTUAL_RESULT_DIR);
+ dfs.copyToLocalFile(src, dest);
+ HadoopMiniClusterTest.copyResultsToLocal(RESULT_PATH, "test.txt", false, conf, true, dfs);
+ }
+}
diff --git a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
index d4ae5dd..0f2d714 100644
--- a/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
+++ b/genomix/genomix-hadoop/src/test/java/edu/uci/ics/genomix/hadoop/pmcommon/HadoopMiniClusterTest.java
@@ -69,12 +69,19 @@
Configuration conf) throws IOException {
copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText, conf, true);
}
+
+ public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+ copyResultsToLocal(hdfsSrcDir, localDestFile, resultsAreText,
+ conf, ignoreZeroOutputs, dfs);
+ }
+
/*
* Merge and copy a DFS directory to a local destination, converting to text if necessary.
* Also locally store the binary-formatted result if available.
*/
- protected static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
- Configuration conf, boolean ignoreZeroOutputs) throws IOException {
+ public static void copyResultsToLocal(String hdfsSrcDir, String localDestFile, boolean resultsAreText,
+ Configuration conf, boolean ignoreZeroOutputs, FileSystem dfs) throws IOException {
if (resultsAreText) {
// for text files, just concatenate them together
FileUtil.copyMerge(FileSystem.get(conf), new Path(hdfsSrcDir), FileSystem.getLocal(new Configuration()),
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
index cec18bd..d9efd75 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/AggregateKmerAggregateFactory.java
@@ -18,14 +18,20 @@
import java.io.DataOutput;
import java.io.IOException;
+import edu.uci.ics.genomix.data.Marshal;
import edu.uci.ics.genomix.hyracks.data.primitive.PositionReference;
import edu.uci.ics.genomix.hyracks.dataflow.MapKmerPositionToReadOperator;
+import edu.uci.ics.genomix.type.KmerBytesWritable;
+import edu.uci.ics.genomix.type.KmerListWritable;
+import edu.uci.ics.genomix.type.NodeWritable;
+import edu.uci.ics.genomix.type.PositionListWritable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -36,13 +42,19 @@
*
*/
private static final long serialVersionUID = 1L;
-
+ private PositionListWritable nodeIdList = new PositionListWritable();
+ private KmerListWritable forwardForwardList = new KmerListWritable(kmerSize);//怎么得到kmersize
+ private KmerListWritable forwardReverseList = new KmerListWritable(kmerSize);
+ private KmerListWritable reverseForwardList = new KmerListWritable(kmerSize);
+ private KmerListWritable reverseReverseList = new KmerListWritable(kmerSize);
+ private KmerBytesWritable kmer = new KmerBytesWritable(kmerSize);
+
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults)
throws HyracksDataException {
return new IAggregatorDescriptor() {
- private PositionReference position = new PositionReference();
+ private NodeWritable nodeAggreter = new NodeWritable();// 能否写在外面
protected int getOffSet(IFrameTupleAccessor accessor, int tIndex, int fieldId) {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
@@ -62,27 +74,65 @@
@Override
public AggregateState createAggregateStates() {
- return new AggregateState(new ArrayBackedValueStorage());
+ return new AggregateState(new NodeWritable());
}
@Override
public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- inputVal.reset();
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
- inputVal.append(position);
-
+ NodeWritable inputVal = (NodeWritable) state.state;
+ inputVal.reset(kmerSize);
+ nodeIdList.reset();
+ forwardForwardList.reset(kmerSize);
+ forwardReverseList.reset(kmerSize);
+ reverseForwardList.reset(kmerSize);
+ reverseReverseList.reset(kmerSize);
+
+ kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+ nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+ int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+ forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+ int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+ forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+ int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+ reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+ int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+ reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+ nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+
+ inputVal.getKmer().set(kmer);
+ inputVal.getNodeIdList().appendList(nodeIdList);
+ inputVal.getFFList().appendList(forwardForwardList);
+ inputVal.getFRList().appendList(forwardReverseList);
+ inputVal.getRFList().appendList(reverseForwardList);
+ inputVal.getRRList().appendList(reverseReverseList);
+
// make an empty field
- tupleBuilder.addFieldEndOffset();
+ tupleBuilder.addFieldEndOffset();///????为啥
}
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws HyracksDataException {
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
- position.setNewReference(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));
- inputVal.append(position);
+ NodeWritable inputVal = (NodeWritable) state.state;
+ kmer.set(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 1));//??从1算起??
+ nodeIdList.setNewReference(1, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 2));
+ int ffCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 3));//??
+ forwardForwardList.setNewReference(ffCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 4));
+ int frCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 5));
+ forwardReverseList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 6));
+ int rfCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 7));
+ reverseForwardList.setNewReference(frCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 8));
+ int rrCount = Marshal.getInt(accessor.getBuffer().array(), getOffSet(accessor, tIndex, 9));
+ reverseForwardList.setNewReference(rrCount, accessor.getBuffer().array(), getOffSet(accessor, tIndex, 10));
+ nodeAggreter.set(nodeIdList, forwardForwardList, forwardReverseList, reverseForwardList, reverseForwardList, kmer);
+
+ inputVal.getKmer().set(kmer);
+ inputVal.getNodeIdList().appendList(nodeIdList);
+ inputVal.getFFList().appendList(forwardForwardList);
+ inputVal.getFRList().appendList(forwardReverseList);
+ inputVal.getRFList().appendList(reverseForwardList);
+ inputVal.getRRList().appendList(reverseReverseList);
}
@Override
@@ -95,11 +145,52 @@
public void outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
AggregateState state) throws HyracksDataException {
DataOutput fieldOutput = tupleBuilder.getDataOutput();
- ArrayBackedValueStorage inputVal = (ArrayBackedValueStorage) state.state;
+ NodeWritable inputVal = (NodeWritable) state.state;
try {
- fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+// fieldOutput.write(inputVal.getByteArray(), inputVal.getStartOffset(), inputVal.getLength());
+ tupleBuilder.addFieldEndOffset();// --------------为什么?
+ //-------------------------------------------------------
+// tupleBuilder.reset();
+ fieldOutput.write(inputVal.getKmer().getBytes(), inputVal.getKmer().getOffset(), inputVal.getKmer().getLength());
+
+ tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
+ .getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
+ tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
+ .getLength());
+
+/* if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(outputBuffer, writer);
+ outputAppender.reset(outputBuffer, true);
+ if (!outputAppender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException(
+ "Failed to copy an record into a frame: the record kmerByteSize is too large.");
+ }
+ }*/
+
} catch (IOException e) {
throw new HyracksDataException("I/O exception when writing aggregation to the output buffer.");
}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
deleted file mode 100644
index cc759a4..0000000
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/IntermediateNodeWritable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package edu.uci.ics.genomix.hyracks.contrail.graph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.hadoop.io.WritableComparable;
-
-public class IntermediateNodeWritable implements WritableComparable<IntermediateNodeWritable>, Serializable{
-
- private static final long serialVersionUID = 1L;
- public static final IntermediateNodeWritable EMPTY_NODE = new IntermediateNodeWritable();
-
- private KmerListWritable forwardForwardList;
- private KmerListWritable forwardReverseList;
- private KmerListWritable reverseForwardList;
- private KmerListWritable reverseReverseList;
- private ReadIDWritable readId;
- public IntermediateNodeWritable(){
- forwardForwardList = new KmerListWritable();
- forwardReverseList = new KmerListWritable();
- reverseForwardList = new KmerListWritable();
- reverseReverseList = new KmerListWritable();
- readId = new ReadIDWritable();
- }
-
- public IntermediateNodeWritable(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
- this();
- set(FFList, FRList, RFList, RRList, uniqueKey);
- }
-
- public void set(IntermediateNodeWritable node){
- set(node.forwardForwardList, node.forwardReverseList, node.reverseForwardList,
- node.reverseReverseList, node.readId);
- }
-
- public void set(KmerListWritable FFList, KmerListWritable FRList,
- KmerListWritable RFList, KmerListWritable RRList, ReadIDWritable uniqueKey) {
- this.forwardForwardList.set(FFList);
- this.forwardReverseList.set(FRList);
- this.reverseForwardList.set(RFList);
- this.reverseReverseList.set(RRList);
- this.readId.set(uniqueKey);
- }
-
- public void reset(int kmerSize) {
- forwardForwardList.reset();
- forwardReverseList.reset();
- reverseForwardList.reset();
- reverseReverseList.reset();
- readId.reset();
- }
-
- public KmerListWritable getFFList() {
- return forwardForwardList;
- }
-
- public void setFFList(KmerListWritable forwardForwardList) {
- this.forwardForwardList.set(forwardForwardList);
- }
-
- public KmerListWritable getFRList() {
- return forwardReverseList;
- }
-
- public void setFRList(KmerListWritable forwardReverseList) {
- this.forwardReverseList.set(forwardReverseList);
- }
-
- public KmerListWritable getRFList() {
- return reverseForwardList;
- }
-
- public void setRFList(KmerListWritable reverseForwardList) {
- this.reverseForwardList.set(reverseForwardList);
- }
-
- public KmerListWritable getRRList() {
- return reverseReverseList;
- }
-
- public void setRRList(KmerListWritable reverseReverseList) {
- this.reverseReverseList.set(reverseReverseList);
- }
-
- public ReadIDWritable getreadId() {
- return readId;
- }
-
- public void setreadId(ReadIDWritable readId) {
- this.readId.set(readId);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.forwardForwardList.readFields(in);
- this.forwardReverseList.readFields(in);
- this.reverseForwardList.readFields(in);
- this.reverseReverseList.readFields(in);
- this.readId.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- this.forwardForwardList.write(out);
- this.forwardReverseList.write(out);
- this.reverseForwardList.write(out);
- this.reverseReverseList.write(out);
- this.readId.write(out);
- }
-
- @Override
- public int compareTo(IntermediateNodeWritable other) {
- // TODO Auto-generated method stub
- return this.readId.compareTo(other.readId);
- }
-
- @Override
- public int hashCode() {
- return this.readId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof IntermediateNodeWritable) {
- IntermediateNodeWritable nw = (IntermediateNodeWritable) o;
- return (this.forwardForwardList.equals(nw.forwardForwardList)
- && this.forwardReverseList.equals(nw.forwardReverseList)
- && this.reverseForwardList.equals(nw.reverseForwardList)
- && this.reverseReverseList.equals(nw.reverseReverseList) && (this.readId.equals(nw.readId)));
- }
- return false;
- }
-
- @Override
- public String toString() {
- StringBuilder sbuilder = new StringBuilder();
- sbuilder.append('(');
- sbuilder.append(readId.toString()).append('\t');
- sbuilder.append(forwardForwardList.toString()).append('\t');
- sbuilder.append(forwardReverseList.toString()).append('\t');
- sbuilder.append(reverseForwardList.toString()).append('\t');
- sbuilder.append(reverseReverseList.toString()).append('\t').append(')');
- return sbuilder.toString();
- }
-}
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
index 2579029..b1b22ac 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/contrail/graph/ReadsKeyValueParserFactory.java
@@ -308,12 +308,27 @@
tupleBuilder.addField(Node.getreadId().getByteArray(), Node.getreadId().getStartOffset(), Node.getreadId().getLength());
+ tupleBuilder.getDataOutput().writeInt(Node.getFFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
tupleBuilder.addField(Node.getFFList().getByteArray(), Node.getFFList().getStartOffset(), Node.getFFList()
.getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getFRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
tupleBuilder.addField(Node.getFRList().getByteArray(), Node.getFRList().getStartOffset(), Node.getFRList()
.getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRFList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
tupleBuilder.addField(Node.getRFList().getByteArray(), Node.getRFList().getStartOffset(), Node.getRFList()
.getLength());
+
+ tupleBuilder.getDataOutput().writeInt(Node.getRRList().getCountOfPosition());
+ tupleBuilder.addFieldEndOffset();
+
tupleBuilder.addField(Node.getRRList().getByteArray(), Node.getRRList().getStartOffset(), Node.getRRList()
.getLength());
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
index 3084722..3650553 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/dataflow/ReadsKeyValueParserFactory.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import edu.uci.ics.genomix.type.IntermediateNodeWritable;
+import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.genomix.type.PositionWritable;
diff --git a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
index c4e7063..6026ac1 100644
--- a/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
+++ b/genomix/genomix-hyracks/src/main/java/edu/uci/ics/genomix/hyracks/newgraph/job/JobGenCheckReader.java
@@ -20,8 +20,8 @@
import edu.uci.ics.genomix.hyracks.newgraph.dataflow.ReadsKeyValueParserFactory;
import edu.uci.ics.genomix.hyracks.job.GenomixJobConf;
+import edu.uci.ics.genomix.oldtype.IntermediateNodeWritable;
import edu.uci.ics.genomix.oldtype.PositionWritable;
-import edu.uci.ics.genomix.type.IntermediateNodeWritable;
import edu.uci.ics.genomix.type.KmerBytesWritable;
import edu.uci.ics.genomix.type.KmerListWritable;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
diff --git a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
index bd761a5..4ce59a0 100644
--- a/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
+++ b/genomix/genomix-hyracks/src/test/java/edu/uci/ics/genomix/hyracks/test/JobRunStepByStepTest.java
@@ -75,8 +75,8 @@
@Test
public void TestAll() throws Exception {
- TestReader();
-// TestGroupbyKmer();
+// TestReader();
+ TestGroupbyKmer();
// TestMapKmerToRead();
// TestGroupByReadID();
// TestEndToEnd();
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
index dd37a6a..1a793cd 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/BasicPathMergeVertex.java
@@ -39,13 +39,6 @@
* initiate kmerSize, maxIteration
*/
public void initVertex() {
-// if (kmerSize == -1)
-// kmerSize = getContext().getConfiguration().getInt(KMER_SIZE, 5);
-// if (maxIteration < 0)
-// maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 1000000);
-// outFlag = (byte)0;
-// outgoingMsg.reset();
-// headFlag = (byte)(getVertexValue().getState() & State.IS_HEAD);
}
/**
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 59104f7..05a4700 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -102,8 +102,6 @@
/**
* set nextID to the element that's next (in the node's FF or FR list), returning true when there is a next neighbor
*/
-
-
protected boolean setNextInfo(VertexValueWritable value) {
if (value.getFFList().getCountOfPosition() > 0) {
nextID.set(value.getFFList().getPosition(0));
@@ -121,7 +119,6 @@
/**
* set prevID to the element that's previous (in the node's RR or RF list), returning true when there is a previous neighbor
*/
-
protected boolean setPrevInfo(VertexValueWritable value) {
if (value.getRRList().getCountOfPosition() > 0) {
prevID.set(value.getRRList().getPosition(0));
@@ -169,8 +166,7 @@
} else if (hasPrev && !prevHead) {
// compress this head to the reverse tail
sendUpdateMsgToSuccessor();
- } //else
- //voteToHalt();
+ }
}
}else {
// I'm a tail
@@ -179,24 +175,20 @@
// tails on both sides, and I'm the "local minimum"
// compress me towards the tail in forward dir
sendUpdateMsgToPredecessor();
- } //else
- //voteToHalt();
+ }
} else if (!hasPrev) {
// no previous node
if (!nextHead && curID.compareTo(nextID) < 0) {
// merge towards tail in forward dir
sendUpdateMsgToPredecessor();
- } //else
- //voteToHalt();
+ }
} else if (!hasNext) {
// no next node
if (!prevHead && curID.compareTo(prevID) < 0) {
// merge towards tail in reverse dir
sendUpdateMsgToSuccessor();
- } //else
- //voteToHalt();
- } //else
- //voteToHalt();
+ }
+ }
}
}
else if (getSuperstep() % 4 == 0){