add support for Hadoop Counters via job.setCounterAggregatorClass
The PregelixJob.setCounterAggregatorClass sets up a (user-specified)
global aggregator and an iterationComplete hook to save Counter values.
The user-specified Counter-based aggregator (must extend
HadoopCountersAggregator) is saved to HDFS in each iteration and should
be restart/snapshot-aware.
The usage for setting up counters is to make a call to
job.setCounterAggregatorClass. After job completion, the Counters may
be retrieved from HDFS using BspUtils.getCounters(job).
Note that there is currently only one spot for iterationComplete hooks
and this behavior occupies it.
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
index 5ea6413..c1473dc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/GlobalAggregator.java
@@ -31,7 +31,7 @@
* 2. a final phase which aggregates all partially aggregated states
*
* @param <I extends Writable> vertex identifier type
- * @param <E extends Writable> vertex value type
+ * @param <V extends Writable> vertex value type
* @param <E extends Writable> edge type
* @param <M extends Writable> message type
* @param <P extends Writable>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
index 823e8a8..5254b8c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/IIterationCompleteReporterHook.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.api.job;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
/**
* Interface for an object whose {@link completeIteration} method is called at the end
* of each pregelix job iteration.
@@ -27,6 +29,6 @@
*/
public interface IIterationCompleteReporterHook {
- public void completeIteration(int superstep, PregelixJob job);
+ public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 0d9c5d3..f2c9c84 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -16,6 +16,7 @@
package edu.uci.ics.pregelix.api.job;
import java.io.IOException;
+import java.lang.reflect.Modifier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
@@ -26,7 +27,9 @@
import edu.uci.ics.pregelix.api.graph.VertexPartitioner;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
+import edu.uci.ics.pregelix.api.util.HadoopCountersGlobalAggregateHook;
import edu.uci.ics.pregelix.api.util.GlobalCountAggregator;
+import edu.uci.ics.pregelix.api.util.HadoopCountersAggregator;
/**
* This class represents a Pregelix job.
@@ -86,10 +89,12 @@
public static final String ITERATION_COMPLETE_CLASS = "pregelix.iterationCompleteReporter";
/** comma */
public static final String COMMA_STR = ",";
- /** the names of the aggregator classes active for all vertex types */
+ /** period */
+ public static final String PERIOD_STR = ".";
+ /** the names of the aggregator classes active for all vertex types */
public static final String[] DEFAULT_GLOBAL_AGGREGATOR_CLASSES = { GlobalCountAggregator.class.getName() };
-
- private IIterationCompleteReporterHook itCompleteHook;
+ /** The name of an optional class that aggregates all Vertexes into mapreduce.Counters */
+ public static final String COUNTERS_AGGREGATOR_CLASS = "pregelix.aggregatedCountersClass";
/**
* Construct a Pregelix job from an existing configuration
@@ -99,7 +104,6 @@
*/
public PregelixJob(Configuration conf) throws IOException {
super(conf);
- itCompleteHook = null;
}
/**
@@ -111,7 +115,6 @@
*/
public PregelixJob(String jobName) throws IOException {
super(new Configuration(), jobName);
- itCompleteHook = null;
}
/**
@@ -125,7 +128,6 @@
*/
public PregelixJob(Configuration conf, String jobName) throws IOException {
super(conf, jobName);
- itCompleteHook = null;
}
/**
@@ -268,10 +270,10 @@
final public void setCheckpointingInterval(int ckpInterval) {
getConfiguration().setInt(CKP_INTERVAL, ckpInterval);
}
-
+
/**
- * Users can provide an IIterationCompleteReporterHook implementation to perform actions
- * at the end of each iteration
+ * Users can provide an IIterationCompleteReporterHook implementation to perform actions
+ * at the end of each iteration
*
* @param reporterClass
*/
@@ -288,23 +290,13 @@
getConfiguration().setBoolean(DYNAMIC_OPTIMIZATION, dynamicOpt);
}
- /**
- * Get the IterationCompleteReporterHook for this job.
- *
- * If the job has not yet been run, this value will be null.
- *
- * @return the completion reporter instance or null if the job hasn't been run yet
- */
- final public IIterationCompleteReporterHook getIterationCompleteReporterHook() {
- return itCompleteHook;
- }
-
- /**
- * Pregelix internal use only:
- * Set the IterationCompleteReporterHook for this job.
- */
- final public void setIterationCompleteReporterHook(IIterationCompleteReporterHook itCompleteHook) {
- this.itCompleteHook = itCompleteHook;
+ final public void setCounterAggregatorClass(Class<? extends HadoopCountersAggregator<?, ?, ?, ?, ?>> aggClass) {
+ if (Modifier.isAbstract(aggClass.getModifiers())) {
+ throw new IllegalArgumentException("Aggregate class must be a concrete class, not an abstract one! (was " + aggClass.getName() + ")");
+ }
+ getConfiguration().setClass(COUNTERS_AGGREGATOR_CLASS, aggClass, HadoopCountersAggregator.class);
+ addGlobalAggregatorClass(aggClass);
+ setIterationCompleteReporterHook(HadoopCountersGlobalAggregateHook.class);
}
@Override
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 675fb0e..2fcaf91 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -15,14 +15,23 @@
package edu.uci.ics.pregelix.api.util;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.util.ReflectionUtils;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.MsgList;
@@ -41,6 +50,10 @@
* them.
*/
public class BspUtils {
+
+ public static final String TMP_DIR = "/tmp/";
+ private static final String COUNTERS_VALUE_ON_ITERATION = ".counters.valueOnIter.";
+ private static final String COUNTERS_LAST_ITERATION_COMPLETED = ".counters.lastIterCompleted";
/**
* Get the user's subclassed {@link VertexInputFormat}.
@@ -732,4 +745,115 @@
public static int getCheckpointingInterval(Configuration conf) {
return conf.getInt(PregelixJob.CKP_INTERVAL, -1);
}
+
+ public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
+ throws HyracksDataException {
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ String pathStr = TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ FSDataInputStream input = dfs.open(path);
+ int numOfAggs = createFinalAggregateValues(conf).size();
+ for (int i = 0; i < numOfAggs; i++) {
+ String aggName = input.readUTF();
+ Writable agg = createFinalAggregateValue(conf, aggName);
+ if (aggName.equals(aggClassName)) {
+ agg.readFields(input);
+ input.close();
+ return agg;
+ } else {
+ agg.readFields(input);
+ }
+ }
+ throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
+ throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + "agg";
+ Path path = new Path(pathStr);
+ List<Writable> aggValues = createFinalAggregateValues(conf);
+ HashMap<String, Writable> finalAggs = new HashMap<>();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ for (int i = 0; i < aggValues.size(); i++) {
+ String aggName = input.readUTF();
+ aggValues.get(i).readFields(input);
+ finalAggs.put(aggName, aggValues.get(i));
+ }
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return finalAggs;
+ }
+
+ public static Counters getCounters(PregelixJob job) throws HyracksDataException {
+ Configuration conf = job.getConfiguration();
+ String jobId = getJobId(conf);
+ int lastIter = BspUtils.readCountersLastIteration(conf, jobId);
+ return BspUtils.readCounters(lastIter, conf, jobId);
+ }
+
+ static Counters readCounters(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+ Path path = new Path(pathStr);
+ Counters savedCounters = new Counters();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ savedCounters.readFields(input);
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return savedCounters;
+ }
+
+ static void writeCounters(Counters toWrite, int superstep, Configuration conf, String jobId)
+ throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_VALUE_ON_ITERATION + superstep;
+ Path path = new Path(pathStr);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataOutputStream output = dfs.create(path, true);
+ toWrite.write(output);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ static int readCountersLastIteration(Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+ Path path = new Path(pathStr);
+ IntWritable lastIter = new IntWritable();
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataInputStream input = dfs.open(path);
+ lastIter.readFields(input);
+ input.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return lastIter.get();
+ }
+
+ static void writeCountersLastIteration(int superstep, Configuration conf, String jobId) throws HyracksDataException {
+ String pathStr = TMP_DIR + jobId + BspUtils.COUNTERS_LAST_ITERATION_COMPLETED;
+ Path path = new Path(pathStr);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ FSDataOutputStream output = dfs.create(path, true);
+ new IntWritable(superstep).write(output);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
new file mode 100644
index 0000000..85d1876
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersAggregator.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+
+/**
+ * A global aggregator that produces a Hadoop mapreduce.Counters object
+ *
+ */
+public abstract class HadoopCountersAggregator<I extends WritableComparable, V extends Writable, E extends Writable, M extends WritableSizable, P extends Writable> extends
+ GlobalAggregator<I, V, E, M, P, Counters> {
+
+}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
new file mode 100644
index 0000000..35ff22c
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/HadoopCountersGlobalAggregateHook.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+/**
+ * Hook that aggregates a mapreduce.Counters object across all
+ * iterations of a job, saving to HDFS
+ *
+ * @author wbiesing
+ */
+public class HadoopCountersGlobalAggregateHook implements IIterationCompleteReporterHook {
+
+ @Override
+ public void completeIteration(int superstep, PregelixJob job) throws HyracksDataException {
+ Configuration conf = job.getConfiguration();
+ String jobId = BspUtils.getJobId(conf);
+ Class<?> aggClass = conf.getClass(PregelixJob.COUNTERS_AGGREGATOR_CLASS, null);
+ if (aggClass == null)
+ throw new HyracksDataException(
+ "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!");
+ Counters curIterCounters;
+ try {
+ curIterCounters = (Counters) BspUtils.readGlobalAggregateValue(conf, jobId, aggClass.getName());
+ } catch (IllegalStateException e) {
+ throw new HyracksDataException(
+ "A subclass of HadoopCountersAggregator must active for GlobalAggregateCountersHook to operate!", e);
+ }
+ if (superstep > 1) {
+ Counters prevCounters = BspUtils.readCounters(superstep - 1, conf, jobId); // the counters from the previous iterations, all aggregated together
+ curIterCounters.incrAllCounters(prevCounters); // add my counters to previous ones
+ }
+ BspUtils.writeCounters(curIterCounters, superstep, conf, jobId);
+ BspUtils.writeCountersLastIteration(superstep, conf, jobId);
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 4374d85..fdb4136 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -124,8 +124,6 @@
/** add hadoop configurations */
addHadoopConfiguration(currentJob, ipAddress, port, failed);
ICheckpointHook ckpHook = BspUtils.createCheckpointHook(currentJob.getConfiguration());
- currentJob.setIterationCompleteReporterHook(BspUtils.createIterationCompleteHook(currentJob
- .getConfiguration()));
/** load the data */
if ((i == 0 || compatible(lastJob, currentJob)) && !failed) {
@@ -280,6 +278,9 @@
loadData(job, jobGen, deploymentId);
}
}
+ // TODO how long should the hook persist? One per job? Or one per recovery attempt?
+ // since the hook shouldn't be stateful, we do one per recovery attempt
+ IIterationCompleteReporterHook itCompleteHook = BspUtils.createIterationCompleteHook(job.getConfiguration());
int i = doRecovery ? snapshotSuperstep.get() + 1 : 1;
int ckpInterval = BspUtils.getCheckpointingInterval(job.getConfiguration());
boolean terminate = false;
@@ -297,7 +298,7 @@
snapshotJobIndex.set(currentJobIndex);
snapshotSuperstep.set(i);
}
- job.getIterationCompleteReporterHook().completeIteration(i, job);
+ itCompleteHook.completeIteration(i, job);
i++;
} while (!terminate);
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index ac1e7f0..d834868 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -37,7 +37,7 @@
import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
public class IterationUtils {
- public static final String TMP_DIR = "/tmp/";
+ public static final String TMP_DIR = BspUtils.TMP_DIR;
public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
IStateObject state) {
@@ -144,49 +144,13 @@
}
public static Writable readGlobalAggregateValue(Configuration conf, String jobId, String aggClassName)
- throws HyracksDataException {
- try {
- FileSystem dfs = FileSystem.get(conf);
- String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
- Path path = new Path(pathStr);
- FSDataInputStream input = dfs.open(path);
- int numOfAggs = BspUtils.createFinalAggregateValues(conf).size();
- for (int i = 0; i < numOfAggs; i++) {
- String aggName = input.readUTF();
- Writable agg = BspUtils.createFinalAggregateValue(conf, aggName);
- if (aggName.equals(aggClassName)) {
- agg.readFields(input);
- input.close();
- return agg;
- } else {
- agg.readFields(input);
- }
- }
- throw new IllegalStateException("Cannot find the aggregate value for " + aggClassName);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ throws HyracksDataException {
+ return BspUtils.readGlobalAggregateValue(conf, jobId, aggClassName);
}
public static HashMap<String, Writable> readAllGlobalAggregateValues(Configuration conf, String jobId)
- throws HyracksDataException {
- String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
- Path path = new Path(pathStr);
- List<Writable> aggValues = BspUtils.createFinalAggregateValues(conf);
- HashMap<String, Writable> finalAggs = new HashMap<>();
- try {
- FileSystem dfs = FileSystem.get(conf);
- FSDataInputStream input = dfs.open(path);
- for (int i = 0; i < aggValues.size(); i++) {
- String aggName = input.readUTF();
- aggValues.get(i).readFields(input);
- finalAggs.put(aggName, aggValues.get(i));
- }
- input.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- return finalAggs;
+ throws HyracksDataException {
+ return BspUtils.readAllGlobalAggregateValues(conf, jobId);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java
deleted file mode 100644
index bf76657..0000000
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/PerIterationGlobalAggregatesHook.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.dataflow.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.pregelix.api.job.IIterationCompleteReporterHook;
-import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.BspUtils;
-
-/**
- * After each iteration, this hook keeps a record of all the global aggregates in each iteration
- *
- * @author wbiesing
- */
-public class PerIterationGlobalAggregatesHook implements IIterationCompleteReporterHook {
- private ArrayList<HashMap<String, Writable>> aggregatesByIteration = new ArrayList<>();
-
- public List<HashMap<String, Writable>> getAggregatesByIteration() {
- return aggregatesByIteration;
- }
-
- @Override
- public void completeIteration(int superstep, PregelixJob job) {
- Configuration conf = job.getConfiguration();
- HashMap<String, Writable> aggValues;
- try {
- aggValues = IterationUtils
- .readAllGlobalAggregateValues(conf, BspUtils.getJobId(conf));
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- // forget aggregates that happened later than current iteration (e.g., restarting
- // after checkpointing) by trimming the array to size() == superstep - 1
- while (aggregatesByIteration.size() > superstep) {
- aggregatesByIteration.remove(aggregatesByIteration.size() - 1);
- }
- aggregatesByIteration.add(aggValues);
- }
-}