gc optimize h4 reduce
diff --git a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
index 1fef016..a5ba50d 100644
--- a/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
+++ b/genomix/genomix-hadoop/src/main/java/edu/uci/ics/genomix/hadoop/graphclean/mergepaths/h4/MergePathsH4.java
@@ -290,11 +290,15 @@
private NodeWithFlagWritable outputValue;
private NodeWritable curNode;
private PositionWritable outPosn;
- private ArrayList<NodeWithFlagWritable> updateMsgs;
private boolean sawCurNode;
private byte outFlag;
private byte inFlag;
+ // to prevent GC on update messages, we keep them all in one list and use the Node set method rather than creating new Node's
+ private ArrayList<NodeWithFlagWritable> updateMsgs;
+ private int updateMsgsSize;
+ private int updateMsgsCount;
+
public void configure(JobConf conf) {
KMER_SIZE = conf.getInt("sizeKmer", 0);
inputValue = new NodeWithFlagWritable(KMER_SIZE);
@@ -302,6 +306,7 @@
curNode = new NodeWritable(KMER_SIZE);
outPosn = new PositionWritable();
updateMsgs = new ArrayList<NodeWithFlagWritable>();
+ updateMsgsSize = updateMsgs.size();
}
/*
@@ -315,14 +320,14 @@
public void reduce(PositionWritable key, Iterator<NodeWithFlagWritable> values,
OutputCollector<PositionWritable, NodeWithFlagWritable> output, Reporter reporter) throws IOException {
sawCurNode = false;
- updateMsgs.clear();
-
+ updateMsgsCount = 0;
+
byte inDir;
while (values.hasNext()) {
inputValue.set(values.next());
inFlag = inputValue.getFlag();
inDir = (byte) (inFlag & MessageFlag.MSG_MASK);
-
+
switch (inDir) {
case MessageFlag.MSG_UPDATE_MERGE:
case MessageFlag.MSG_SELF:
@@ -334,13 +339,13 @@
sawCurNode = true;
if (inDir == MessageFlag.MSG_SELF) {
outPosn.set(curNode.getNodeID());
- } else { // MSG_UPDATE_MERGE
+ } else { // MSG_UPDATE_MERGE
// merge messages are sent to their merge recipient
outPosn.set(curNode.getListFromDir(inDir).getPosition(0));
}
break;
case MessageFlag.MSG_UPDATE_EDGE:
- updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ addUpdateMessage(inputValue);
break;
default:
throw new IOException("Unrecognized message type: " + (inFlag & MessageFlag.MSG_MASK));
@@ -349,15 +354,23 @@
// process all the update messages for this node
// I have no idea how to make this more efficient...
- for (NodeWithFlagWritable updateMsg : updateMsgs) {
- NodeWithFlagWritable.processUpdates(curNode, updateMsg, KMER_SIZE);
+ for (int i=0; i < updateMsgsCount; i++) {
+ NodeWithFlagWritable.processUpdates(curNode, updateMsgs.get(i), KMER_SIZE);
}
outputValue.set(outFlag, curNode);
output.collect(outPosn, outputValue);
}
+
+ private void addUpdateMessage(NodeWithFlagWritable myInputValue) {
+ updateMsgsCount++;
+ if (updateMsgsCount >= updateMsgsSize) {
+ updateMsgs.add(new NodeWithFlagWritable(inputValue)); // make a copy of inputValue-- not a reference!
+ } else {
+ updateMsgs.get(updateMsgsCount - 1).set(myInputValue); // update existing reference
+ }
+ }
}
-
-
+
/*
* Mapper class: sends the update messages to their (already decided) destination
*/
@@ -413,15 +426,11 @@
inFlag = value.getFlag();
curNode.set(value.getNode());
curID.set(curNode.getNodeID());
-
+
}
}
-
-
-
-
/*
* Reducer class: processes the update messages from updateMapper
*/