Aggregator passes test
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
index cbe7852..7b82dc1 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/BasicGraphCleanVertex.java
@@ -1,13 +1,19 @@
package edu.uci.ics.genomix.pregelix.operator;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.genomix.pregelix.io.ByteWritable;
+import edu.uci.ics.genomix.pregelix.io.HashMapWritable;
import edu.uci.ics.genomix.pregelix.io.MessageWritable;
+import edu.uci.ics.genomix.pregelix.io.VLongWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
@@ -692,4 +698,13 @@
outgoingEdgeDir = connectedTable[i][1];
}
+ public static HashMapWritable<ByteWritable, VLongWritable> readStatisticsCounterResult(Configuration conf) {
+ try {
+ VertexValueWritable value = (VertexValueWritable) IterationUtils
+ .readGlobalAggregateValue(conf, BspUtils.getJobId(conf));
+ return value.getCounters();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/aggregator/StatisticsAggregator.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/aggregator/StatisticsAggregator.java
index 1c81be4..7062cfe 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/aggregator/StatisticsAggregator.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/aggregator/StatisticsAggregator.java
@@ -15,6 +15,7 @@
public class StatisticsAggregator extends
GlobalAggregator<VKmerBytesWritable, VertexValueWritable, NullWritable, MessageWritable, VertexValueWritable, VertexValueWritable>{
+ public static HashMapWritable<ByteWritable, VLongWritable> preGlobalCounters = new HashMapWritable<ByteWritable, VLongWritable>();
private VertexValueWritable value = new VertexValueWritable();
@Override
@@ -53,6 +54,7 @@
@Override
public VertexValueWritable finishFinal() {
+ updateAggregateState(preGlobalCounters);
return value;
}
diff --git a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
index 9b0241d..57eb2f2 100644
--- a/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
+++ b/genomix/genomix-pregelix/src/main/java/edu/uci/ics/genomix/pregelix/operator/pathmerge/P4ForPathMergeVertex.java
@@ -19,6 +19,8 @@
import edu.uci.ics.genomix.pregelix.io.VLongWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
import edu.uci.ics.genomix.pregelix.io.VertexValueWritable.State;
+import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
+import edu.uci.ics.genomix.pregelix.operator.aggregator.StatisticsAggregator;
import edu.uci.ics.genomix.pregelix.type.MessageFlag;
import edu.uci.ics.genomix.pregelix.type.StatisticsCounter;
import edu.uci.ics.genomix.type.VKmerBytesWritable;
@@ -105,6 +107,8 @@
if(repeatKmer == null)
repeatKmer = new VKmerBytesWritable();
tmpValue.reset();
+ if(getSuperstep() > 1)
+ StatisticsAggregator.preGlobalCounters = BasicGraphCleanVertex.readStatisticsCounterResult(getContext().getConfiguration());
}
protected boolean isNodeRandomHead(VKmerBytesWritable nodeKmer) {
@@ -164,18 +168,21 @@
public void compute(Iterator<PathMergeMessageWritable> msgIterator) {
initVertex();
counters.clear();
+ getVertexValue().getCounters().clear();
if(getSuperstep() == 1){
- if(getVertexId().toString().contains("AT") || getVertexId().toString().contains("GA")){
+// if(getVertexId().toString().contains("AT") || getVertexId().toString().contains("GA")){
updateStatisticsCounter(StatisticsCounter.MergedNodes);
// updateStatisticsCounter(StatisticsCounter.MergedPaths);
getVertexValue().setCounters(counters);
activate();
- }
+// }
} else if(getSuperstep() == 2){
if(getVertexId().toString().contains("AA")){
updateStatisticsCounter(StatisticsCounter.MergedNodes);
- getVertexValue().setCounters(counters);
+ } else{
+ updateStatisticsCounter(StatisticsCounter.MergedPaths);
}
+ getVertexValue().setCounters(counters);
voteToHalt();
}
// else{
diff --git a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
index 4714ab3..4d53925 100644
--- a/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
+++ b/genomix/genomix-pregelix/src/test/java/edu/uci/ics/genomix/pregelix/JobRun/BasicSmallTestCase.java
@@ -15,28 +15,24 @@
package edu.uci.ics.genomix.pregelix.JobRun;
-import java.io.IOException;
-
import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.genomix.config.GenomixJobConf;
import edu.uci.ics.genomix.pregelix.graph.GenerateGraphViz;
+import edu.uci.ics.genomix.pregelix.io.ByteWritable;
import edu.uci.ics.genomix.pregelix.io.HashMapWritable;
-import edu.uci.ics.genomix.pregelix.io.VertexValueWritable;
+import edu.uci.ics.genomix.pregelix.io.VLongWritable;
+import edu.uci.ics.genomix.pregelix.operator.BasicGraphCleanVertex;
import edu.uci.ics.genomix.pregelix.sequencefile.GenerateTextFile;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.core.base.IDriver.Plan;
import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
public class BasicSmallTestCase extends TestCase {
private final PregelixJob job;
@@ -70,16 +66,6 @@
}
}
- private static HashMapWritable readStatisticsCounterResult(Configuration conf) {
- try {
- VertexValueWritable value = (VertexValueWritable) IterationUtils
- .readGlobalAggregateValue(conf, BspUtils.getJobId(conf));
- return value.getCounters();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
@Test
public void test() throws Exception {
setUp();
@@ -87,7 +73,7 @@
for (Plan plan : plans) {
driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
- HashMapWritable counters = readStatisticsCounterResult(job.getConfiguration());
+ HashMapWritable<ByteWritable, VLongWritable> counters = BasicGraphCleanVertex.readStatisticsCounterResult(job.getConfiguration());
System.out.println(counters.toString());
}
compareResults();
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
index 0c6abd1..025bc62 100644
--- a/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/cluster.properties
@@ -20,7 +20,7 @@
NCLOGS_DIR=$NCTMP_DIR/logs
#Comma separated I/O directories for the spilling of external sort
-IO_DIRS="/tmp/t3,/tmp/t4,/tmp/t5,/tmp/t6"
+IO_DIRS="/tmp/t3"
#The JAVA_HOME
JAVA_HOME=$JAVA_HOME
diff --git a/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
index 2daf1ee..5fdc25e 100644
--- a/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
+++ b/genomix/genomix-pregelix/src/test/resources/cluster/stores.properties
@@ -1 +1 @@
-store=teststore1,teststore2,teststore3,teststore4,
\ No newline at end of file
+store=teststore1
\ No newline at end of file