use Counters as partial value to simplify HadoopCountersAggregator
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
index 85d1876..b0814d9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
@@ -14,6 +14,10 @@
*/
package edu.uci.ics.pregelix.api.util;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Counters;
@@ -25,9 +29,59 @@
/**
* A global aggregator that produces a Hadoop mapreduce.Counters object
- *
*/
-public abstract class HadoopCountersAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable> extends
- GlobalAggregator<I, V, E, M, P, Counters> {
+@SuppressWarnings("rawtypes")
+public abstract class HadoopCountersAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable>
+ extends GlobalAggregator<I, V, E, M, Counters, Counters> {
+ private ResettableCounters counters = new ResettableCounters();
+ public Counters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public void init() {
+ counters.reset();
+ }
+
+ @Override
+ public void step(Counters partialResult) {
+ counters.incrAllCounters(partialResult);
+ }
+
+ @Override
+ public Counters finishPartial() {
+ return counters;
+ }
+
+ @Override
+ public Counters finishFinal() {
+ return counters;
+ }
+
+ /**
+ * mapreduce.Counters object that is resettable via .reset()
+ */
+ public static class ResettableCounters extends Counters {
+ private static final DataInputStream zeroStream = new DataInputStream(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return 0;
+ }
+ });
+
+ /**
+ * Reset this Counters object
+ *
+ * The reset is done by simulating a readFields() from a stream of 0's,
+ * indicating a serialized length of 0 groups. The Counters' cache is not changed.
+ */
+ public void reset() {
+ try {
+ this.readFields(zeroStream);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected failure when trying to reset Counters object!", e);
+ }
+ }
+ }
}