add an "iteration complete" hook for aggregation/reporting across iterations

This commit allows the user to specify a class which will be called upon
completion of each pregelix iteration.  This allows us to perform a user-
specified action between iterations.

As an example, a PerIterationGlobalAggregatesHook is provided which
saves the complete set of global aggregator states from every iteration,
allowing the user to observe aggregates from all iterations.

The default hook does nothing.

The hook instance is attached directly to the PregelixJob so that it can
be retrieved by the Driver's caller.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
new file mode 100644
index 0000000..823e8a8
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.job;
+
+/**
+ * Interface for an object whose {@link completeIteration} method is called at the end
+ * of each pregelix job iteration.
+ * 
+ * This class can be used to extend/replace the simple reporting in pregelix or to
+ * implement aggregation across iterations of a job (rather than having the values
+ * reset after each iteration).
+ * One object is created for each job.
+ * 
+ * @author jake.biesinger
+ */
+public interface IIterationCompleteReporterHook {
+
+    public void completeIteration(int superstep, PregelixJob job);
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index e42f533..0d9c5d3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -82,10 +82,14 @@
     public static final String CKP_INTERVAL = "pregelix.ckpinterval";
     /** the dynamic optimization */
     public static final String DYNAMIC_OPTIMIZATION = "pregelix.dynamicopt";
+    /** the iteration complete reporter hook */
+    public static final String ITERATION_COMPLETE_CLASS = "pregelix.iterationCompleteReporter";
     /** comma */
     public static final String COMMA_STR = ",";
     /** the names of the aggregator classes active for all vertex types */ 
     public static final String[] DEFAULT_GLOBAL_AGGREGATOR_CLASSES = { GlobalCountAggregator.class.getName() };
+    
+    private IIterationCompleteReporterHook itCompleteHook;
 
     /**
      * Construct a Pregelix job from an existing configuration
@@ -95,6 +99,7 @@
      */
     public PregelixJob(Configuration conf) throws IOException {
         super(conf);
+        itCompleteHook = null;
     }
 
     /**
@@ -106,6 +111,7 @@
      */
     public PregelixJob(String jobName) throws IOException {
         super(new Configuration(), jobName);
+        itCompleteHook = null;
     }
 
     /**
@@ -119,6 +125,7 @@
      */
     public PregelixJob(Configuration conf, String jobName) throws IOException {
         super(conf, jobName);
+        itCompleteHook = null;
     }
 
     /**
@@ -261,6 +268,16 @@
     final public void setCheckpointingInterval(int ckpInterval) {
         getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
     }
+    
+    /**
+     * Users can provide an IIterationCompleteReporterHook implementation to perform actions 
+     * at the end of each iteration  
+     * 
+     * @param reporterClass
+     */
+    final public void setIterationCompleteReporterHook(Class<? extends IIterationCompleteReporterHook> reporterClass) {
+        getConfiguration().setClass(ITERATION_COMPLETE_CLASS, reporterClass, IIterationCompleteReporterHook.class);
+    }
 
     /**
      * Indicate if dynamic optimization is enabled
@@ -271,6 +288,25 @@
         getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
     }
 
+    /**
+     * Get the IterationCompleteReporterHook for this job.
+     * 
+     * If the job has not yet been run, this value will be null.
+     * 
+     * @return the completion reporter instance or null if the job hasn't been run yet
+     */
+    final public IIterationCompleteReporterHook getIterationCompleteReporterHook() {
+        return itCompleteHook;
+    }
+
+    /**
+     * Pregelix internal use only:
+     *   Set the IterationCompleteReporterHook for this job.
+     */
+    final public void setIterationCompleteReporterHook(IIterationCompleteReporterHook itCompleteHook) {
+        this.itCompleteHook = itCompleteHook;
+    }
+
     @Override
     public String toString() {
         return getJobName();
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index d303969..675fb0e 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -33,6 +33,7 @@
 import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 
 /**
@@ -571,6 +572,24 @@
     }
 
     /**
+     * Create a hook that indicates an iteration is complete
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return Instantiated user aggregate value
+     */
+    public static IIterationCompleteReporterHook createIterationCompleteHook(Configuration conf) {
+        Class<? extends IIterationCompleteReporterHook> itCompleteClass = getIterationCompleteReporterHookClass(conf);
+        try {
+            return itCompleteClass.newInstance();
+        } catch (InstantiationException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Failed to instantiate", e);
+        } catch (IllegalAccessException e) {
+            throw new IllegalArgumentException("createVertexPartitioner: Illegally accessed", e);
+        }
+    }
+
+    /**
      * Get the user's subclassed vertex partitioner class.
      * 
      * @param conf
@@ -595,6 +614,20 @@
     }
 
     /**
+     * Get the user's subclassed iteration complete reporter hook class.
+     * 
+     * @param conf
+     *            Configuration to check
+     * @return The user defined vertex iteration complete reporter class
+     */
+    @SuppressWarnings("unchecked")
+    public static <V extends IIterationCompleteReporterHook> Class<V> getIterationCompleteReporterHookClass(
+            Configuration conf) {
+        return (Class<V>) conf.getClass(PregelixJob.ITERATION_COMPLETE_CLASS,
+                DefaultIterationCompleteReporterHook.class, IIterationCompleteReporterHook.class);
+    }
+
+    /**
      * Get the job configuration parameter whether the vertex states will increase dynamically
      * 
      * @param conf
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
index 4f5fef0..9721589 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/ConservativeCheckpointHook.java
@@ -17,7 +17,7 @@
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
 
 /**
- * A conservative checkpoint hook which does checkpoint every 5 supersteps
+ * A conservative checkpoint hook which does checkpoint every 2 supersteps
  * 
  * @author yingyib
  */
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
new file mode 100644
index 0000000..1881429
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/DefaultIterationCompleteReporterHook.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * The default iteration complete reporter hook does nothing
+ * 
+ * @author wbiesing
+ */
+public class DefaultIterationCompleteReporterHook implements IIterationCompleteReporterHook {
+
+    @Override
+    public void completeIteration(int superstep, PregelixJob job) {
+        System.out.println("iteration complete reporter for " + superstep + " job " + job);
+    }
+
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index b3a90e9..4374d85 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.client.stats.Counters;
 import edu.uci.ics.hyracks.client.stats.impl.ClientCounterContext;
 import edu.uci.ics.pregelix.api.job.ICheckpointHook;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.core.base.IDriver;
@@ -123,6 +124,8 @@
                         /** add hadoop configurations */
                         addHadoopConfiguration(currentJob, ipAddress, port, failed);
                         ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
+                        currentJob.setIterationCompleteReporterHook(BspUtils.createIterationCompleteHook(currentJob
+                                .getConfiguration()));
 
                         /** load the data */
                         if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
@@ -294,6 +297,7 @@
                 snapshotJobIndex.set(currentJobIndex);
                 snapshotSuperstep.set(i);
             }
+            job.getIterationCompleteReporterHook().completeIteration(i, job);
             i++;
         } while (!terminate);
     }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 6de65ca..ac1e7f0 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.pregelix.dataflow.util;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -166,5 +167,26 @@
             throw new HyracksDataException(e);
         }
     }
+    
+    public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+            throws HyracksDataException {
+        String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+        Path path = new Path(pathStr);
+        List<Writable> aggValues = BspUtils.createFinalAggregateValues(conf);
+        HashMap<String, Writable> finalAggs = new HashMap<>();
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            FSDataInputStream input = dfs.open(path);
+            for (int i = 0; i < aggValues.size(); i++) {
+                String aggName = input.readUTF();
+                aggValues.get(i).readFields(input);
+                finalAggs.put(aggName, aggValues.get(i));
+            }
+            input.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return finalAggs;
+    }
 
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java
new file mode 100644
index 0000000..bf76657
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+/**
+ * After each iteration, this hook keeps a record of all the global aggregates in each iteration
+ * 
+ * @author wbiesing
+ */
+public class PerIterationGlobalAggregatesHook implements IIterationCompleteReporterHook {
+    private ArrayList<HashMap<String, Writable>> aggregatesByIteration = new ArrayList<>();
+
+    public List<HashMap<String, Writable>> getAggregatesByIteration() {
+        return aggregatesByIteration;
+    }
+
+    @Override
+    public void completeIteration(int superstep, PregelixJob job) {
+        Configuration conf = job.getConfiguration();
+        HashMap<String, Writable> aggValues;
+        try {
+            aggValues = IterationUtils
+                    .readAllGlobalAggregateValues(conf, BspUtils.getJobId(conf));
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+        // forget aggregates that happened later than current iteration (e.g., restarting 
+        // after checkpointing) by trimming the array to size() == superstep - 1
+        while (aggregatesByIteration.size() > superstep) {
+            aggregatesByIteration.remove(aggregatesByIteration.size() - 1);
+        }
+        aggregatesByIteration.add(aggValues);
+    }
+}