support large-size global aggreate values
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 51a9ce3..d68ad2c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -538,4 +538,15 @@
     public static boolean useLSM(Configuration conf) {
         return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
     }
+
+    /***
+     * Get the spilling dir name for global aggregates
+     * 
+     * @param conf
+     * @param superStep
+     * @return the spilling dir name
+     */
+    public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
+        return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
index 24105ae..a0f67e3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -15,6 +15,14 @@
 
 package edu.uci.ics.pregelix.api.util;
 
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,4 +49,19 @@
         }
     }
 
+    public static void flushTupleToHDFS(ArrayTupleBuilder atb, Configuration conf, long superStep)
+            throws HyracksDataException {
+        try {
+            if (atb.getSize()>0) {
+                FileSystem dfs = FileSystem.get(conf);
+                String fileName = BspUtils.getGlobalAggregateSpillingDirName(conf, superStep) +"/" + UUID.randomUUID();
+                FSDataOutputStream dos = dfs.create(new Path(fileName), true);
+                dos.write(atb.getByteArray(), 0, atb.getSize());
+                dos.flush();
+                dos.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3e6e9a5..7bd2cf8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
             IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 3;
+            int maxRetryCount = 1;
 
             do {
                 try {
@@ -142,12 +142,11 @@
                     //restart from snapshot
                     failed = true;
                     retryCount++;
-                    ioe.printStackTrace();
+                    throw new HyracksException(ioe);
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
         } catch (Exception e) {
-            e.printStackTrace();
             throw new HyracksException(e);
         }
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index d37c6fd..36723a6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -108,11 +108,13 @@
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
@@ -374,9 +376,10 @@
     @Override
     public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         try {
+
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
             tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
-            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
             tmpJob.setOutputKeyClass(NullWritable.class);
             tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
@@ -409,7 +412,7 @@
         try {
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
-            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
             JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
             JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -637,7 +640,7 @@
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -657,7 +660,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
@@ -696,6 +699,12 @@
                 recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+                        pregelixJob.getConfiguration())));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink);
@@ -706,7 +715,8 @@
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
                 materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
         spec.setFrameSize(frameSize);
         return new JobSpecification[] { spec };
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 58384b2..41887c0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -529,7 +529,7 @@
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
 
         /** generate secondary index checkpoint */
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
         tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
         tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +569,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         JobSpecification spec = new JobSpecification();
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e8a6b5c..e1795de 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,6 +61,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
+        //ccConfig.heartbeatPeriod = 5000;
+        //ccConfig.maxHeartbeatLapsePeriods = 1;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -74,7 +76,7 @@
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
-        ncConfig1.ioDevices="dev1,dev2";
+        ncConfig1.ioDevices = "dev1,dev2";
         ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -87,7 +89,7 @@
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
-        ncConfig2.ioDevices="dev3,dev4";
+        ncConfig2.ioDevices = "dev3,dev4";
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
@@ -96,6 +98,14 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
+    public static void showDownNC1() throws Exception {
+        nc1.stop();
+    }
+
+    public static void showDownNC2() throws Exception {
+        nc2.stop();
+    }
+
     public static void deinit() throws Exception {
         nc2.stop();
         nc1.stop();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 3fed609..c0be9dd 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -17,9 +17,13 @@
 
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +36,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -93,10 +98,28 @@
 
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public void close() throws HyracksDataException {
-                Writable finalAggregateValue = aggregator.finishFinal();
-                IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+                try {
+                    // iterate over hdfs spilled aggregates
+                    FileSystem dfs = FileSystem.get(conf);
+                    String spillingDir = BspUtils.getGlobalAggregateSpillingDirName(conf, Vertex.getSuperstep());
+                    FileStatus[] files = dfs.listStatus(new Path(spillingDir));
+                    if (files != null) {
+                        // goes into this branch only when there are spilled files
+                        for (int i = 0; i < files.length; i++) {
+                            FileStatus file = files[i];
+                            DataInput dis = dfs.open(file.getPath());
+                            partialAggregateValue.readFields(dis);
+                            aggregator.step(partialAggregateValue);
+                        }
+                    }
+                    Writable finalAggregateValue = aggregator.finishFinal();
+                    IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
             }
 
         };
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index b74a5de..9a21680 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -168,7 +168,8 @@
                 FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
                     @Override
                     public boolean accept(Path dir) {
-                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+                                && dir.getName().indexOf(".crc") < 0;
                     }
                 });
                 return results;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 496d066..a366899 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -61,7 +61,7 @@
     private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
     private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
     private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
-    
+
     private final ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
             return new Thread(r);
@@ -145,7 +145,7 @@
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
         Boolean toMove = jobIdToMove.get(jobId);
         if (toMove == null || toMove == true) {
             if (jobIdToSuperStep.get(jobId) == null) {
@@ -159,7 +159,11 @@
                     fileRef.delete();
             }
 
-            Vertex.setSuperstep(++superStep);
+            if (currentIteration > 0) {
+                Vertex.setSuperstep(currentIteration);
+            } else {
+                Vertex.setSuperstep(++superStep);
+            }
             Vertex.setNumVertices(numVertices);
             Vertex.setNumEdges(numEdges);
             jobIdToSuperStep.put(jobId, superStep);
@@ -169,8 +173,8 @@
         System.gc();
     }
 
-    public synchronized void endSuperStep(String giraphJobId) {
-        jobIdToMove.put(giraphJobId, true);
+    public synchronized void endSuperStep(String pregelixJobId) {
+        jobIdToMove.put(pregelixJobId, true);
         LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 603a464..1cf81ac 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,11 +75,11 @@
         context.endSuperStep(giraphJobId);
     }
 
-    public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1));
+        context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
     }
 
     public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index 1066e3b..9994c0e 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -94,7 +94,7 @@
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.4.1</version>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
new file mode 100644
index 0000000..dac087f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryTest {
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            //            Thread thread = new Thread(new Runnable() {
+            //
+            //                @Override
+            //                public void run() {
+            //                    try {
+            //                        synchronized (this) {
+            //                            this.wait(10000);
+            //                            PregelixHyracksIntegrationUtil.showDownNC1();
+            //                        }
+            //                    } catch (Exception e) {
+            //                        throw new IllegalStateException(e);
+            //                    }
+            //                }
+            //
+            //            });
+            //thread.start();
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
similarity index 96%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
index 5a2636a..a2d32c0 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
 
 import junit.framework.Assert;
 
@@ -29,6 +29,7 @@
 import edu.uci.ics.pregelix.example.FailureVertex;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
 
 /**
  * This test case tests the error message propagation.
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
similarity index 91%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index d2995f1..5a485ba 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -24,13 +24,12 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
 import edu.uci.ics.pregelix.example.util.TestUtils;
 
 /**
@@ -56,7 +55,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            job1.setCheckpointHook(ConservativeCheckpointHook.class);
+            //job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -66,7 +65,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            job2.setCheckpointHook(ConservativeCheckpointHook.class);
+            //job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
new file mode 100644
index 0000000..474d0a6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.aggregator.OverflowAggregator;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class OverflowAggregatorTest {
+
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setGlobalAggregatorClass(OverflowAggregator.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+            Text text = (Text) IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
+                    BspUtils.getJobId(job.getConfiguration()));
+            Assert.assertEquals(text.getLength(), 20 * 32767);
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
new file mode 100644
index 0000000..34b8b51
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.aggregator;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Test the case where the global aggregate's state is bloated
+ * 
+ * @author yingyib
+ */
+public class OverflowAggregator extends
+        GlobalAggregator<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable, Text, Text> {
+
+    private int textLength = 0;
+    private int inc = 32767;
+
+    @Override
+    public void init() {
+        textLength = 0;
+    }
+
+    @Override
+    public void step(Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> v)
+            throws HyracksDataException {
+        textLength += inc;
+    }
+
+    @Override
+    public void step(Text partialResult) {
+        textLength += partialResult.getLength();
+    }
+
+    @Override
+    public Text finishPartial() {
+        byte[] partialResult = new byte[textLength];
+        for (int i = 0; i < partialResult.length; i++) {
+            partialResult[i] = 'a';
+        }
+        Text text = new Text();
+        text.set(partialResult);
+        return text;
+    }
+
+    @Override
+    public Text finishFinal() {
+        byte[] result = new byte[textLength];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = 'a';
+        }
+        Text text = new Text();
+        text.set(result);
+        return text;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
similarity index 98%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index d0cf654..40ea690 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example.util;
 
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ab564fa..f3a0bb4 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -237,8 +237,12 @@
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
                     tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+                        // aggregate state exceed the page size, write to HDFS
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                    }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index b4e1dd8..ca8ec01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,7 +179,7 @@
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
-                
+
                 if (vertex.isHalted()) {
                     vertex.activate();
                 }
@@ -230,8 +230,12 @@
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
                     tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+                        // aggregate state exceed the page size, write to HDFS
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                    }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index cd2012a..3dcdad2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -40,7 +40,7 @@
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration(ctx);
-                IterationUtils.setProperties(jobId, ctx, conf);
+                IterationUtils.setProperties(jobId, ctx, conf, -1);
             }
 
         };
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
new file mode 100644
index 0000000..35e7cd8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+/**
+ * Recover the pregelix job state in a NC
+ * 
+ * @author yingyib
+ */
+public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
+    private static final long serialVersionUID = 1L;
+    private final int currentSuperStep;
+    private String jobId;
+    private IConfigurationFactory confFactory;
+
+    public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+        this.currentSuperStep = currentSuperStep;
+        this.jobId = jobId;
+        this.confFactory = confFactory;
+    }
+
+    @Override
+    public IRuntimeHook createRuntimeHook() {
+        return new IRuntimeHook() {
+
+            @Override
+            public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+                IterationUtils.endSuperStep(jobId, ctx);
+                Configuration conf = confFactory.createConfiguration(ctx);
+                IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+            }
+
+        };
+    }
+
+}