Merge branch 'master' into salsubaiee/master_lsm
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 51a9ce3..d68ad2c 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -538,4 +538,15 @@
     public static boolean useLSM(Configuration conf) {
         return conf.getBoolean(PregelixJob.UPDATE_INTENSIVE, false);
     }
+
+    /***
+     * Get the spilling dir name for global aggregates
+     * 
+     * @param conf
+     * @param superStep
+     * @return the spilling dir name
+     */
+    public static String getGlobalAggregateSpillingDirName(Configuration conf, long superStep) {
+        return "/tmp/pregelix/agg/" + conf.get(PregelixJob.JOB_ID) + "/" + superStep;
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
index 24105ae..a0f67e3 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/FrameTupleUtils.java
@@ -15,6 +15,14 @@
 
 package edu.uci.ics.pregelix.api.util;
 
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -41,4 +49,19 @@
         }
     }
 
+    public static void flushTupleToHDFS(ArrayTupleBuilder atb, Configuration conf, long superStep)
+            throws HyracksDataException {
+        try {
+            if (atb.getSize()>0) {
+                FileSystem dfs = FileSystem.get(conf);
+                String fileName = BspUtils.getGlobalAggregateSpillingDirName(conf, superStep) +"/" + UUID.randomUUID();
+                FSDataOutputStream dos = dfs.create(new Path(fileName), true);
+                dos.write(atb.getByteArray(), 0, atb.getSize());
+                dos.flush();
+                dos.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index a3f24f4..3d1699f 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -90,21 +90,55 @@
                 <version>1.3</version>
 				<executions>
 					<execution>
+						<id>pregelix</id>
 						<configuration>
+							<platforms>
+								<platform>unix</platform>
+							</platforms>
 							<programs>
 								<program>
 									<mainClass>org.apache.hadoop.util.RunJar</mainClass>
-									<name>pregelix-obselete</name>
+									<name>pregelix</name>
 								</program>
 							</programs>
 							<repositoryLayout>flat</repositoryLayout>
 							<repositoryName>lib</repositoryName>
+							<configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$1"</configurationDirectory>
 						</configuration>
 						<phase>package</phase>
 						<goals>
 							<goal>assemble</goal>
 						</goals>
 					</execution>
+					<execution>
+						<id>cc_nc_drivers</id>
+						<configuration>
+							<platforms>
+								<platform>unix</platform>
+							</platforms>
+							<programs>
+								<program>
+									<mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+									<name>pregelixcc</name>
+								</program>
+								<program>
+									<mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+									<name>pregelixnc</name>
+									<commandLineArguments>
+										<commandLineArgument>-app-nc-main-class</commandLineArgument>
+										<commandLineArgument>edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint</commandLineArgument>
+									</commandLineArguments>
+								</program>
+							</programs>
+							<repositoryLayout>flat</repositoryLayout>
+							<repositoryName>lib</repositoryName>
+							<configurationDirectory>etc:"$HADOOP_HOME"/conf:/etc/hadoop/conf</configurationDirectory>
+						</configuration>
+						<phase>package</phase>
+						<goals>
+							<goal>assemble</goal>
+						</goals>
+					</execution>					
 				</executions>
 			</plugin>
 			<plugin>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3e6e9a5..7bd2cf8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -99,7 +99,7 @@
             IntWritable lastSnapshotSuperstep = new IntWritable(0);
             boolean failed = false;
             int retryCount = 0;
-            int maxRetryCount = 3;
+            int maxRetryCount = 1;
 
             do {
                 try {
@@ -142,12 +142,11 @@
                     //restart from snapshot
                     failed = true;
                     retryCount++;
-                    ioe.printStackTrace();
+                    throw new HyracksException(ioe);
                 }
             } while (failed && retryCount < maxRetryCount);
             LOG.info("job finished");
         } catch (Exception e) {
-            e.printStackTrace();
             throw new HyracksException(e);
         }
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index d37c6fd..36723a6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -108,11 +108,13 @@
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
 import edu.uci.ics.pregelix.runtime.bootstrap.VirtualBufferCacheProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RecoveryRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexPartitionComputerFactory;
@@ -374,9 +376,10 @@
     @Override
     public JobSpecification[] generateCheckpointing(int lastSuccessfulIteration) throws HyracksException {
         try {
+
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpointing for job " + jobId, pregelixJob);
             tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
-            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath));
+            FileOutputFormat.setOutputPath(tmpJob, new Path(vertexCheckpointPath + "/" + lastSuccessfulIteration));
             tmpJob.setOutputKeyClass(NullWritable.class);
             tmpJob.setOutputValueClass(BspUtils.getVertexClass(tmpJob.getConfiguration()));
             JobSpecification vertexCkpSpec = scanIndexWriteToHDFS(tmpJob.getConfiguration());
@@ -409,7 +412,7 @@
         try {
             PregelixJob tmpJob = this.createCloneJob("Vertex checkpoint loading for job " + jobId, pregelixJob);
             tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
-            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath));
+            FileInputFormat.setInputPaths(tmpJob, new Path(vertexCheckpointPath + "/" + lastCheckpointedIteration));
             JobSpecification vertexLoadSpec = loadHDFSData(tmpJob.getConfiguration());
             JobSpecification[] stateLoadSpecs = generateStateCheckpointLoading(lastCheckpointedIteration, tmpJob);
             JobSpecification[] specs = new JobSpecification[1 + stateLoadSpecs.length];
@@ -637,7 +640,7 @@
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastSuccessfulIteration;
         PregelixJob tmpJob = createCloneJob("State checkpointing for job " + jobId, pregelixJob);
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
@@ -657,7 +660,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     protected JobSpecification[] generateStateCheckpointLoading(int lastCheckpointedIteration, PregelixJob job)
             throws HyracksException {
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/message/" + lastCheckpointedIteration;
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
@@ -696,6 +699,12 @@
                 recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
+        /** construct runtime hook */
+        RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+                new RecoveryRuntimeHookFactory(jobId, lastCheckpointedIteration + 1, new ConfigurationFactory(
+                        pregelixJob.getConfiguration())));
+        ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
         /** construct empty sink operator */
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink);
@@ -706,7 +715,8 @@
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0,
                 materialize, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, emptySink, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
         spec.setFrameSize(frameSize);
         return new JobSpecification[] { spec };
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 58384b2..41887c0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -529,7 +529,7 @@
         tmpJob.setVertexOutputFormatClass(InternalVertexOutputFormat.class);
 
         /** generate secondary index checkpoint */
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;
         FileOutputFormat.setOutputPath(tmpJob, new Path(checkpointPath));
         tmpJob.setOutputKeyClass(BspUtils.getVertexIndexClass(tmpJob.getConfiguration()));
         tmpJob.setOutputValueClass(MsgList.class);
@@ -569,7 +569,7 @@
         Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(job.getConfiguration());
         JobSpecification spec = new JobSpecification();
 
-        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary";
+        String checkpointPath = "/tmp/ckpoint/" + jobId + "/secondary/" + lastSuccessfulIteration;;
         PregelixJob tmpJob = createCloneJob("State checkpoint loading for job " + jobId, job);
         tmpJob.setVertexInputFormatClass(InternalVertexInputFormat.class);
         try {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index e8a6b5c..e1795de 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -61,6 +61,8 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
+        //ccConfig.heartbeatPeriod = 5000;
+        //ccConfig.maxHeartbeatLapsePeriods = 1;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -74,7 +76,7 @@
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
-        ncConfig1.ioDevices="dev1,dev2";
+        ncConfig1.ioDevices = "dev1,dev2";
         ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
@@ -87,7 +89,7 @@
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
-        ncConfig2.ioDevices="dev3,dev4";
+        ncConfig2.ioDevices = "dev3,dev4";
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
@@ -96,6 +98,14 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
+    public static void showDownNC1() throws Exception {
+        nc1.stop();
+    }
+
+    public static void showDownNC2() throws Exception {
+        nc2.stop();
+    }
+
     public static void deinit() throws Exception {
         nc2.stop();
         nc1.stop();
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
deleted file mode 100644
index 7232ccc..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ /dev/null
@@ -1,111 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
-  ls=`ls -ld "$PRG"`
-  link=`expr "$ls" : '.*-> \(.*\)$'`
-  if expr "$link" : '/.*' > /dev/null; then
-    PRG="$link"
-  else
-    PRG=`dirname "$PRG"`/"$link"
-  fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support.  $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
-  CYGWIN*) cygwin=true ;;
-  Darwin*) darwin=true
-           if [ -z "$JAVA_VERSION" ] ; then
-             JAVA_VERSION="CurrentJDK"
-           else
-             echo "Using Java version: $JAVA_VERSION"
-           fi
-           if [ -z "$JAVA_HOME" ] ; then
-             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
-           fi
-           ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
-  if [ -r /etc/gentoo-release ] ; then
-    JAVA_HOME=`java-config --jre-home`
-  fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
-  if [ -n "$JAVA_HOME"  ] ; then
-    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
-      # IBM's JDK on AIX uses strange locations for the executables
-      JAVACMD="$JAVA_HOME/jre/sh/java"
-    else
-      JAVACMD="$JAVA_HOME/bin/java"
-    fi
-  else
-    JAVACMD=`which java`
-  fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
-  echo "Error: JAVA_HOME is not defined correctly." 1>&2
-  echo "  We cannot execute $JAVACMD" 1>&2
-  exit 1
-fi
-
-if [ -z "$REPO" ]
-then
-  REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
-  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
-  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
-  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS  \
-  -classpath "$CLASSPATH" \
-  -Dapp.name="pregelix" \
-  -Dapp.pid="$$" \
-  -Dapp.repo="$REPO" \
-  -Dapp.home="$BASEDIR" \
-  -Dbasedir="$BASEDIR" \
-  org.apache.hadoop.util.RunJar \
-  "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat b/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat
deleted file mode 100644
index fe53029..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix.bat
+++ /dev/null
@@ -1,124 +0,0 @@
-@rem /*
-@rem Copyright 2009-2013 by The Regents of the University of California
-@rem Licensed under the Apache License, Version 2.0 (the "License");
-@rem you may not use this file except in compliance with the License.
-@rem you may obtain a copy of the License from
-@rem 
-@rem     http://www.apache.org/licenses/LICENSE-2.0
-@rem 
-@rem Unless required by applicable law or agreed to in writing, software
-@rem distributed under the License is distributed on an "AS IS" BASIS,
-@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@rem See the License for the specific language governing permissions and
-@rem limitations under the License.
-@rem */
-@REM ----------------------------------------------------------------------------

-@REM  Copyright 2001-2006 The Apache Software Foundation.

-@REM

-@REM  Licensed under the Apache License, Version 2.0 (the "License");

-@REM  you may not use this file except in compliance with the License.

-@REM  You may obtain a copy of the License at

-@REM

-@REM       http://www.apache.org/licenses/LICENSE-2.0

-@REM

-@REM  Unless required by applicable law or agreed to in writing, software

-@REM  distributed under the License is distributed on an "AS IS" BASIS,

-@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

-@REM  See the License for the specific language governing permissions and

-@REM  limitations under the License.

-@REM ----------------------------------------------------------------------------

-@REM

-@REM   Copyright (c) 2001-2006 The Apache Software Foundation.  All rights

-@REM   reserved.

-

-@echo off

-

-set ERROR_CODE=0

-

-:init

-@REM Decide how to startup depending on the version of windows

-

-@REM -- Win98ME

-if NOT "%OS%"=="Windows_NT" goto Win9xArg

-

-@REM set local scope for the variables with windows NT shell

-if "%OS%"=="Windows_NT" @setlocal

-

-@REM -- 4NT shell

-if "%eval[2+2]" == "4" goto 4NTArgs

-

-@REM -- Regular WinNT shell

-set CMD_LINE_ARGS=%*

-goto WinNTGetScriptDir

-

-@REM The 4NT Shell from jp software

-:4NTArgs

-set CMD_LINE_ARGS=%$

-goto WinNTGetScriptDir

-

-:Win9xArg

-@REM Slurp the command line arguments.  This loop allows for an unlimited number

-@REM of arguments (up to the command line limit, anyway).

-set CMD_LINE_ARGS=

-:Win9xApp

-if %1a==a goto Win9xGetScriptDir

-set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1

-shift

-goto Win9xApp

-

-:Win9xGetScriptDir

-set SAVEDIR=%CD%

-%0\

-cd %0\..\.. 

-set BASEDIR=%CD%

-cd %SAVEDIR%

-set SAVE_DIR=

-goto repoSetup

-

-:WinNTGetScriptDir

-set BASEDIR=%~dp0\..

-

-:repoSetup

-

-

-if "%JAVACMD%"=="" set JAVACMD=java

-

-if "%REPO%"=="" set REPO=%BASEDIR%\lib

-

-cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\

-

-set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar

-goto endInit

-

-@REM Reaching here means variables are defined and arguments have been captured

-:endInit

-

-%JAVACMD% %JAVA_OPTS%  -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%

-if ERRORLEVEL 1 goto error

-goto end

-

-:error

-if "%OS%"=="Windows_NT" @endlocal

-set ERROR_CODE=%ERRORLEVEL%

-

-:end

-@REM set local scope for the variables with windows NT shell

-if "%OS%"=="Windows_NT" goto endNT

-

-@REM For old DOS remove the set variables from ENV - we assume they were not set

-@REM before we started - at least we don't leave any baggage around

-set CMD_LINE_ARGS=

-goto postExec

-

-:endNT

-@REM If error code is set to 1 then the endlocal was done already in :error.

-if %ERROR_CODE% EQU 0 @endlocal

-

-

-:postExec

-

-if "%FORCE_EXIT_ON_ERROR%" == "on" (

-  if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%

-)

-

-exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
deleted file mode 100755
index c1ee3f2..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
+++ /dev/null
@@ -1,114 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
-  ls=`ls -ld "$PRG"`
-  link=`expr "$ls" : '.*-> \(.*\)$'`
-  if expr "$link" : '/.*' > /dev/null; then
-    PRG="$link"
-  else
-    PRG=`dirname "$PRG"`/"$link"
-  fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support.  $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
-  CYGWIN*) cygwin=true ;;
-  Darwin*) darwin=true
-           if [ -z "$JAVA_VERSION" ] ; then
-             JAVA_VERSION="CurrentJDK"
-           else
-             echo "Using Java version: $JAVA_VERSION"
-           fi
-           if [ -z "$JAVA_HOME" ] ; then
-             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
-           fi
-           ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
-  if [ -r /etc/gentoo-release ] ; then
-    JAVA_HOME=`java-config --jre-home`
-  fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
-  if [ -n "$JAVA_HOME"  ] ; then
-    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
-      # IBM's JDK on AIX uses strange locations for the executables
-      JAVACMD="$JAVA_HOME/jre/sh/java"
-    else
-      JAVACMD="$JAVA_HOME/bin/java"
-    fi
-  else
-    JAVACMD=`which java`
-  fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
-  echo "Error: JAVA_HOME is not defined correctly." 1>&2
-  echo "  We cannot execute $JAVACMD" 1>&2
-  exit 1
-fi
-
-if [ -z "$REPO" ]
-then
-  REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-for f in ${REPO}/*.jar; do
-  CLASSPATH=${CLASSPATH}:$f;
-done
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
-  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
-  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
-  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS  \
-  -classpath "$CLASSPATH" \
-  -Dapp.name="pregelixcc" \
-  -Dapp.pid="$$" \
-  -Dapp.repo="$REPO" \
-  -Dapp.home="$BASEDIR" \
-  -Dbasedir="$BASEDIR" \
-  edu.uci.ics.hyracks.control.cc.CCDriver \
-  "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
deleted file mode 100755
index c01b4b4..0000000
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
+++ /dev/null
@@ -1,115 +0,0 @@
-#!/bin/sh
-#
-#------------------------------------------------------------------------
-# Copyright 2009-2013 by The Regents of the University of California
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# you may obtain a copy of the License from
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ------------------------------------------------------------------------
-#
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
-  ls=`ls -ld "$PRG"`
-  link=`expr "$ls" : '.*-> \(.*\)$'`
-  if expr "$link" : '/.*' > /dev/null; then
-    PRG="$link"
-  else
-    PRG=`dirname "$PRG"`/"$link"
-  fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support.  $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
-  CYGWIN*) cygwin=true ;;
-  Darwin*) darwin=true
-           if [ -z "$JAVA_VERSION" ] ; then
-             JAVA_VERSION="CurrentJDK"
-           else
-             echo "Using Java version: $JAVA_VERSION"
-           fi
-           if [ -z "$JAVA_HOME" ] ; then
-             JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
-           fi
-           ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
-  if [ -r /etc/gentoo-release ] ; then
-    JAVA_HOME=`java-config --jre-home`
-  fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
-  if [ -n "$JAVA_HOME"  ] ; then
-    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
-      # IBM's JDK on AIX uses strange locations for the executables
-      JAVACMD="$JAVA_HOME/jre/sh/java"
-    else
-      JAVACMD="$JAVA_HOME/bin/java"
-    fi
-  else
-    JAVACMD=`which java`
-  fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
-  echo "Error: JAVA_HOME is not defined correctly." 1>&2
-  echo "  We cannot execute $JAVACMD" 1>&2
-  exit 1
-fi
-
-if [ -z "$REPO" ]
-then
-  REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
-
-for f in ${REPO}/*.jar; do
-  CLASSPATH=${CLASSPATH}:$f;
-done
-
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
-  [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
-  [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
-  [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`2
-  [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
-  [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS  \
-  -classpath "$CLASSPATH" \
-  -Dapp.name="pregelixnc" \
-  -Dapp.pid="$$" \
-  -Dapp.repo="$REPO" \
-  -Dapp.home="$BASEDIR" \
-  -Dbasedir="$BASEDIR" \
-  edu.uci.ics.hyracks.control.nc.NCDriver \
-  -app-nc-main-class edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint "$@"
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index 3fed609..c0be9dd 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -17,9 +17,13 @@
 
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +36,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
@@ -93,10 +98,28 @@
 
             }
 
+            @SuppressWarnings("unchecked")
             @Override
             public void close() throws HyracksDataException {
-                Writable finalAggregateValue = aggregator.finishFinal();
-                IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+                try {
+                    // iterate over hdfs spilled aggregates
+                    FileSystem dfs = FileSystem.get(conf);
+                    String spillingDir = BspUtils.getGlobalAggregateSpillingDirName(conf, Vertex.getSuperstep());
+                    FileStatus[] files = dfs.listStatus(new Path(spillingDir));
+                    if (files != null) {
+                        // goes into this branch only when there are spilled files
+                        for (int i = 0; i < files.length; i++) {
+                            FileStatus file = files[i];
+                            DataInput dis = dfs.open(file.getPath());
+                            partialAggregateValue.readFields(dis);
+                            aggregator.step(partialAggregateValue);
+                        }
+                    }
+                    Writable finalAggregateValue = aggregator.finishFinal();
+                    IterationUtils.writeGlobalAggregateValue(conf, jobId, finalAggregateValue);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
             }
 
         };
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index b74a5de..9a21680 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -168,7 +168,8 @@
                 FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
                     @Override
                     public boolean accept(Path dir) {
-                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+                        return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0
+                                && dir.getName().indexOf(".crc") < 0;
                     }
                 });
                 return results;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index adcb702..bfe89ab 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -147,7 +147,7 @@
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges) {
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, int currentIteration) {
         Boolean toMove = jobIdToMove.get(jobId);
         if (toMove == null || toMove == true) {
             if (jobIdToSuperStep.get(jobId) == null) {
@@ -161,7 +161,11 @@
                     fileRef.delete();
             }
 
-            Vertex.setSuperstep(++superStep);
+            if (currentIteration > 0) {
+                Vertex.setSuperstep(currentIteration);
+            } else {
+                Vertex.setSuperstep(++superStep);
+            }
             Vertex.setNumVertices(numVertices);
             Vertex.setNumEdges(numEdges);
             jobIdToSuperStep.put(jobId, superStep);
@@ -171,8 +175,8 @@
         System.gc();
     }
 
-    public synchronized void endSuperStep(String giraphJobId) {
-        jobIdToMove.put(giraphJobId, true);
+    public synchronized void endSuperStep(String pregelixJobId) {
+        jobIdToMove.put(pregelixJobId, true);
         LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 603a464..1cf81ac 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -75,11 +75,11 @@
         context.endSuperStep(giraphJobId);
     }
 
-    public static void setProperties(String giraphJobId, IHyracksTaskContext ctx, Configuration conf) {
+    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, int currentIteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.setVertexProperties(giraphJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1));
+        context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
     }
 
     public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index 1066e3b..9994c0e 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -94,7 +94,7 @@
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.4.1</version>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
new file mode 100644
index 0000000..dac087f
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureRecoveryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class FailureRecoveryTest {
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            //            Thread thread = new Thread(new Runnable() {
+            //
+            //                @Override
+            //                public void run() {
+            //                    try {
+            //                        synchronized (this) {
+            //                            this.wait(10000);
+            //                            PregelixHyracksIntegrationUtil.showDownNC1();
+            //                        }
+            //                    } catch (Exception e) {
+            //                        throw new IllegalStateException(e);
+            //                    }
+            //                }
+            //
+            //            });
+            //thread.start();
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
similarity index 96%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
index 5a2636a..a2d32c0 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/FailureVertexTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/FailureVertexTest.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
 
 import junit.framework.Assert;
 
@@ -29,6 +29,7 @@
 import edu.uci.ics.pregelix.example.FailureVertex;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
 
 /**
  * This test case tests the error message propagation.
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
similarity index 91%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
index d2995f1..5a485ba 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/JobConcatenationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/JobConcatenationTest.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -24,13 +24,12 @@
 import org.junit.Test;
 
 import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
 import edu.uci.ics.pregelix.core.driver.Driver;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.example.PageRankVertex;
 import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
 import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
 import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
 import edu.uci.ics.pregelix.example.util.TestUtils;
 
 /**
@@ -56,7 +55,7 @@
             job1.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileInputFormat.setInputPaths(job1, INPUTPATH);
             job1.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            job1.setCheckpointHook(ConservativeCheckpointHook.class);
+            //job1.setCheckpointHook(ConservativeCheckpointHook.class);
 
             PregelixJob job2 = new PregelixJob(PageRankVertex.class.getName());
             job2.setVertexClass(PageRankVertex.class);
@@ -66,7 +65,7 @@
             job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
             FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH));
             job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
-            job2.setCheckpointHook(ConservativeCheckpointHook.class);
+            //job2.setCheckpointHook(ConservativeCheckpointHook.class);
 
             jobs.add(job1);
             jobs.add(job2);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
new file mode 100644
index 0000000..474d0a6
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/OverflowAggregatorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+import edu.uci.ics.pregelix.example.PageRankVertex.SimplePageRankVertexOutputFormat;
+import edu.uci.ics.pregelix.example.aggregator.OverflowAggregator;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * @author yingyib
+ */
+public class OverflowAggregatorTest {
+
+    private static String INPUTPATH = "data/webmap";
+    private static String OUTPUTPAH = "actual/result";
+    private static String EXPECTEDPATH = "src/test/resources/expected/PageRankReal";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+
+        try {
+            PregelixJob job = new PregelixJob(PageRankVertex.class.getName());
+            job.setVertexClass(PageRankVertex.class);
+            job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+            job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+            job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+            job.setGlobalAggregatorClass(OverflowAggregator.class);
+
+            testCluster.setUp();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+            Text text = (Text) IterationUtils.readGlobalAggregateValue(job.getConfiguration(),
+                    BspUtils.getJobId(job.getConfiguration()));
+            Assert.assertEquals(text.getLength(), 20 * 32767);
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            testCluster.tearDown();
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
new file mode 100644
index 0000000..34b8b51
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/aggregator/OverflowAggregator.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.example.aggregator;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.example.io.DoubleWritable;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Test the case where the global aggregate's state is bloated
+ * 
+ * @author yingyib
+ */
+public class OverflowAggregator extends
+        GlobalAggregator<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable, Text, Text> {
+
+    private int textLength = 0;
+    private int inc = 32767;
+
+    @Override
+    public void init() {
+        textLength = 0;
+    }
+
+    @Override
+    public void step(Vertex<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> v)
+            throws HyracksDataException {
+        textLength += inc;
+    }
+
+    @Override
+    public void step(Text partialResult) {
+        textLength += partialResult.getLength();
+    }
+
+    @Override
+    public Text finishPartial() {
+        byte[] partialResult = new byte[textLength];
+        for (int i = 0; i < partialResult.length; i++) {
+            partialResult[i] = 'a';
+        }
+        Text text = new Text();
+        text.set(partialResult);
+        return text;
+    }
+
+    @Override
+    public Text finishFinal() {
+        byte[] result = new byte[textLength];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = 'a';
+        }
+        Text text = new Text();
+        text.set(result);
+        return text;
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
similarity index 98%
rename from pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
rename to pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
index d0cf654..40ea690 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/test/TestCluster.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestCluster.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.pregelix.example.test;
+package edu.uci.ics.pregelix.example.util;
 
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index ab564fa..f3a0bb4 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -237,8 +237,12 @@
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
                     tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+                        // aggregate state exceed the page size, write to HDFS
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                    }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index b4e1dd8..ca8ec01 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -179,7 +179,7 @@
                 vertex.setOutputWriters(writers);
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
-                
+
                 if (vertex.isHalted()) {
                     vertex.activate();
                 }
@@ -230,8 +230,12 @@
                     Writable agg = aggregator.finishPartial();
                     agg.write(tbGlobalAggregate.getDataOutput());
                     tbGlobalAggregate.addFieldEndOffset();
-                    appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
-                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize());
+                    if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
+                            tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
+                        // aggregate state exceed the page size, write to HDFS
+                        FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+                        appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
+                    }
                     FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index cd2012a..3dcdad2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -40,7 +40,7 @@
             @Override
             public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
                 Configuration conf = confFactory.createConfiguration(ctx);
-                IterationUtils.setProperties(jobId, ctx, conf);
+                IterationUtils.setProperties(jobId, ctx, conf, -1);
             }
 
         };
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
new file mode 100644
index 0000000..35e7cd8
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RecoveryRuntimeHookFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+/**
+ * Recover the pregelix job state in a NC
+ * 
+ * @author yingyib
+ */
+public class RecoveryRuntimeHookFactory implements IRuntimeHookFactory {
+    private static final long serialVersionUID = 1L;
+    private final int currentSuperStep;
+    private String jobId;
+    private IConfigurationFactory confFactory;
+
+    public RecoveryRuntimeHookFactory(String jobId, int currentSuperStep, IConfigurationFactory confFactory) {
+        this.currentSuperStep = currentSuperStep;
+        this.jobId = jobId;
+        this.confFactory = confFactory;
+    }
+
+    @Override
+    public IRuntimeHook createRuntimeHook() {
+        return new IRuntimeHook() {
+
+            @Override
+            public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
+                IterationUtils.endSuperStep(jobId, ctx);
+                Configuration conf = confFactory.createConfiguration(ctx);
+                IterationUtils.setProperties(jobId, ctx, conf, currentSuperStep);
+            }
+
+        };
+    }
+
+}