addd partition early termination support
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 3f0a3ec..c52130d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.pregelix.api.io.WritableSizable;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.api.util.SerDeUtils;
 
 /**
@@ -76,6 +78,8 @@
     private boolean hasMessage = false;
     /** created new vertex */
     private boolean createdNewLiveVertex = false;
+    /** terminate the partition */
+    private boolean terminatePartition = false;
 
     /**
      * use object pool for re-using objects
@@ -88,12 +92,23 @@
     private int usedValue = 0;
 
     /**
-     * The key method that users need to implement
+     * The key method that users need to implement to process
+     * incoming messages in each superstep.
+     * 1. In a superstep, this method can be called multiple times in a continuous manner for a single
+     * vertex, each of which is to process a batch of messages. (Note that
+     * this only happens for the case when the mssages for a single vertex
+     * exceed one frame.)
+     * 2. In each superstep, before any invocation of this method for a vertex,
+     * open() is called; after all the invocations of this method for the vertex,
+     * close is called.
+     * 3. In each partition, the vertex Java object is reused
+     * for all the vertice to be processed in the same partition. (The model
+     * is the same as the key-value objects in hadoop map tasks.)
      * 
      * @param msgIterator
      *            an iterator of incoming messages
      */
-    public abstract void compute(Iterator<M> msgIterator);
+    public abstract void compute(Iterator<M> msgIterator) throws Exception;
 
     /**
      * Add an edge for the vertex.
@@ -583,6 +598,8 @@
 
     /**
      * called immediately before invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
      */
     public void open() {
 
@@ -590,9 +607,40 @@
 
     /**
      * called immediately after all the invocations of compute() on a vertex
+     * Users can override this method to initiate the state for a vertex
+     * before the compute() invocations
      */
     public void close() {
 
     }
 
+    /**
+     * Terminate the current partition where the current vertex stays in.
+     * This will immediately take effect and the upcoming vertice in the
+     * same partition cannot be processed.
+     * 
+     */
+    protected final void terminatePartition() {
+        voteToHalt();
+        terminatePartition = true;
+    }
+
+    /**
+     * Terminate the Pregelix job.
+     * This will take effect only when the current iteration completed.
+     * 
+     * @throws Exception
+     */
+    protected void terminateJob() throws Exception {
+        Configuration conf = getContext().getConfiguration();
+        JobStateUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
+    }
+
+    /***
+     * @return true if the partition is terminated; false otherwise
+     */
+    public boolean isPartitionTerminated() {
+        return terminatePartition;
+    }
+
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
new file mode 100644
index 0000000..4a98167
--- /dev/null
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/JobStateUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.api.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * @author yingyib
+ */
+public class JobStateUtils {
+
+    public static final String TMP_DIR = "/tmp/";
+
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (!dfs.exists(path)) {
+                FSDataOutputStream output = dfs.create(path, true);
+                output.writeBoolean(true);
+                output.flush();
+                output.close();
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        try {
+            FileSystem dfs = FileSystem.get(conf);
+            String pathStr = TMP_DIR + jobId + "fterm";
+            Path path = new Path(pathStr);
+            if (dfs.exists(path)) {
+                return true;
+            } else {
+                return false;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index fade226..2685f7f 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -87,7 +87,7 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>pregelix-api</artifactId>
+			<artifactId>hyracks-dataflow-common</artifactId>
 			<version>0.2.9-SNAPSHOT</version>
 			<type>jar</type>
 			<scope>compile</scope>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 1d7c979..75f8ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
 import edu.uci.ics.pregelix.dataflow.context.StateKey;
 
@@ -91,22 +92,6 @@
         }
     }
 
-    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
-            Path path = new Path(pathStr);
-            if (!dfs.exists(path)) {
-                FSDataOutputStream output = dfs.create(path, true);
-                output.writeBoolean(true);
-                output.flush();
-                output.close();
-            }
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
     public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
             throws HyracksDataException {
         try {
@@ -136,19 +121,12 @@
         }
     }
 
+    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+        JobStateUtils.writeForceTerminationState(conf, jobId);
+    }
+
     public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "fterm";
-            Path path = new Path(pathStr);
-            if (dfs.exists(path)) {
-                return true;
-            } else {
-                return false;
-            }
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
+        return JobStateUtils.readForceTerminationState(conf, jobId);
     }
 
     public static Writable readGlobalAggregateValue(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
new file mode 100644
index 0000000..e369d29
--- /dev/null
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/EarlyTerminationVertex.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexWriter;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat;
+import edu.uci.ics.pregelix.api.io.text.TextVertexOutputFormat.TextVertexWriter;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.example.client.Client;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat;
+import edu.uci.ics.pregelix.example.io.VLongWritable;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+public class EarlyTerminationVertex extends Vertex<VLongWritable, VLongWritable, VLongWritable, VLongWritable> {
+    private VLongWritable tempValue = new VLongWritable();
+
+    @Override
+    public void compute(Iterator<VLongWritable> msgIterator) {
+        if (getSuperstep() == 1) {
+            if (getVertexId().get() % 4 == 2) {
+                terminatePartition();
+            } else {
+                tempValue.set(1);
+                setVertexValue(tempValue);
+            }
+        }
+        if (getSuperstep() == 2) {
+            if (getVertexId().get() % 4 == 3) {
+                terminatePartition();
+            } else {
+                tempValue.set(2);
+                setVertexValue(tempValue);
+                voteToHalt();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return getVertexId() + " " + getVertexValue();
+    }
+
+    /**
+     * Simple VertexWriter that support
+     */
+    public static class SimpleEarlyTerminattionVertexWriter extends
+            TextVertexWriter<VLongWritable, VLongWritable, VLongWritable> {
+        public SimpleEarlyTerminattionVertexWriter(RecordWriter<Text, Text> lineRecordWriter) {
+            super(lineRecordWriter);
+        }
+
+        @Override
+        public void writeVertex(Vertex<VLongWritable, VLongWritable, VLongWritable, ?> vertex) throws IOException,
+                InterruptedException {
+            getRecordWriter().write(new Text(vertex.getVertexId().toString()),
+                    new Text(vertex.getVertexValue().toString()));
+        }
+    }
+
+    public static class SimpleEarlyTerminattionVertexOutputFormat extends
+            TextVertexOutputFormat<VLongWritable, VLongWritable, VLongWritable> {
+
+        @Override
+        public VertexWriter<VLongWritable, VLongWritable, VLongWritable> createVertexWriter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            RecordWriter<Text, Text> recordWriter = textOutputFormat.getRecordWriter(context);
+            return new SimpleEarlyTerminattionVertexWriter(recordWriter);
+        }
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        PregelixJob job = new PregelixJob(EarlyTerminationVertex.class.getSimpleName());
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        job.setDynamicVertexValueSize(true);
+        Client.run(args, job);
+    }
+
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
index df7f0a5..6c3c752 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowFixedsizeVertex.java
@@ -101,8 +101,8 @@
     }
 
     public static void main(String[] args) throws Exception {
-        PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
-        job.setVertexClass(PageRankVertex.class);
+        PregelixJob job = new PregelixJob(MessageOverflowFixedsizeVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowFixedsizeVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
index b20f6e4..d0221bf 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/MessageOverflowVertex.java
@@ -113,8 +113,8 @@
     }
 
     public static void main(String[] args) throws Exception {
-        PregelixJob job = new PregelixJob(PageRankVertex.class.getSimpleName());
-        job.setVertexClass(PageRankVertex.class);
+        PregelixJob job = new PregelixJob(MessageOverflowVertex.class.getSimpleName());
+        job.setVertexClass(MessageOverflowVertex.class);
         job.setVertexInputFormatClass(TextPageRankInputFormat.class);
         job.setVertexOutputFormatClass(SimpleMessageOverflowVertexOutputFormat.class);
         job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 48abbc3..1bb33b8 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -116,7 +116,7 @@
     }
 
     @Override
-    public void compute(Iterator<ByteWritable> msgIterator) {
+    public void compute(Iterator<ByteWritable> msgIterator) throws Exception {
         if (sourceId < 0) {
             sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
         }
@@ -171,13 +171,20 @@
         return getVertexId() + " " + getVertexValue();
     }
 
-    private void signalTerminate() {
-        Configuration conf = getContext().getConfiguration();
-        try {
-            IterationUtils.writeForceTerminationState(conf, BspUtils.getJobId(conf));
-            writeReachibilityResult(conf, true);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+    private void signalTerminate() throws Exception {
+        writeReachibilityResult(getContext().getConfiguration(), true);
+        terminateJob();
+    }
+
+    private void writeReachibilityResult(Configuration conf, boolean terminate) throws IOException {
+        FileSystem dfs = FileSystem.get(conf);
+        String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
+        Path path = new Path(pathStr);
+        if (!dfs.exists(path)) {
+            FSDataOutputStream output = dfs.create(path, true);
+            output.writeBoolean(terminate);
+            output.flush();
+            output.close();
         }
     }
 
@@ -187,22 +194,6 @@
         }
     }
 
-    private void writeReachibilityResult(Configuration conf, boolean terminate) {
-        try {
-            FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + BspUtils.getJobId(conf) + "reachibility";
-            Path path = new Path(pathStr);
-            if (!dfs.exists(path)) {
-                FSDataOutputStream output = dfs.create(path, true);
-                output.writeBoolean(terminate);
-                output.flush();
-                output.close();
-            }
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
     private static boolean readReachibilityResult(Configuration conf) {
         try {
             FileSystem dfs = FileSystem.get(conf);
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index 88f5a75..13cec61 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -26,6 +26,8 @@
 import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex;
 import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex;
+import edu.uci.ics.pregelix.example.EarlyTerminationVertex.SimpleEarlyTerminattionVertexOutputFormat;
 import edu.uci.ics.pregelix.example.GraphMutationVertex;
 import edu.uci.ics.pregelix.example.GraphMutationVertex.SimpleGraphMutationVertexOutputFormat;
 import edu.uci.ics.pregelix.example.MessageOverflowFixedsizeVertex;
@@ -324,6 +326,18 @@
         job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
     }
 
+    private static void generateEarlyTerminationJob(String jobName, String outputPath) throws IOException {
+        PregelixJob job = new PregelixJob(jobName);
+        job.setVertexClass(EarlyTerminationVertex.class);
+        job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+        job.setVertexOutputFormatClass(SimpleEarlyTerminattionVertexOutputFormat.class);
+        job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+        FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+        FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+        job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+        job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+    }
+
     private static void genPageRank() throws IOException {
         generatePageRankJob("PageRank", outputBase + "PageRank.xml");
         generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
@@ -369,6 +383,10 @@
         generateMessageOverflowFixedsizeJob("Message Overflow Fixedsize", outputBase + "MessageOverflowFixedsize.xml");
     }
 
+    private static void genEarlyTermination() throws IOException {
+        generateEarlyTerminationJob("Early Termination", outputBase + "EarlyTermination.xml");
+    }
+
     public static void main(String[] args) throws IOException {
         genPageRank();
         genShortestPath();
@@ -378,5 +396,6 @@
         genMaximalClique();
         genGraphMutation();
         genMessageOverflow();
+        genEarlyTermination();
     }
 }
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
new file mode 100644
index 0000000..d908da8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/EarlyTermination.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Early Termination</value></property>
+<property><name>pregelix.nmkComputerClass</name><value>edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.EarlyTerminationVertex$SimpleEarlyTerminattionVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 1af8519..656adc6 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -168,38 +168,42 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[3];
-                vertex.setOutputWriters(writers);
-                vertex.setOutputAppenders(appenders);
-                vertex.setOutputTupleBuilders(tbs);
+                if (!vertex.isPartitionTerminated()) {
+                    vertex.setOutputWriters(writers);
+                    vertex.setOutputAppenders(appenders);
+                    vertex.setOutputTupleBuilders(tbs);
 
-                MsgList msgContentList = (MsgList) tuple[1];
-                msgContentList.reset(msgIterator);
+                    MsgList msgContentList = (MsgList) tuple[1];
+                    msgContentList.reset(msgIterator);
 
-                if (!msgIterator.hasNext() && vertex.isHalted()) {
-                    return;
-                }
-                if (vertex.isHalted()) {
-                    vertex.activate();
-                }
-
-                try {
-                    if (msgContentList.segmentStart()) {
-                        vertex.open();
+                    if (!msgIterator.hasNext() && vertex.isHalted()) {
+                        return;
                     }
-                    vertex.compute(msgIterator);
-                    if (msgContentList.segmentEnd()) {
-                        vertex.close();
+                    if (vertex.isHalted()) {
+                        vertex.activate();
                     }
-                    vertex.finishCompute();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
 
-                /**
-                 * this partition should not terminate
-                 */
-                if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
-                    terminate = false;
+                    try {
+                        if (msgContentList.segmentStart()) {
+                            vertex.open();
+                        }
+                        vertex.compute(msgIterator);
+                        if (msgContentList.segmentEnd()) {
+                            vertex.close();
+                        }
+                        vertex.finishCompute();
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+
+                    /**
+                     * this partition should not terminate
+                     */
+                    if (terminate && (!vertex.isHalted() || vertex.hasMessage() || vertex.createdNewLiveVertex()))
+                        terminate = false;
+                } else {
+                    vertex.voteToHalt();
+                }
 
                 aggregator.step(vertex);
             }
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 8e9aff7..ed4c522 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -172,24 +172,28 @@
                 tbAlive.reset();
 
                 vertex = (Vertex) tuple[1];
-                vertex.setOutputWriters(writers);
-                vertex.setOutputAppenders(appenders);
-                vertex.setOutputTupleBuilders(tbs);
+                if (!vertex.isPartitionTerminated()) {
+                    vertex.setOutputWriters(writers);
+                    vertex.setOutputAppenders(appenders);
+                    vertex.setOutputTupleBuilders(tbs);
 
-                if (!msgIterator.hasNext() && vertex.isHalted()) {
-                    return;
-                }
-                if (vertex.isHalted()) {
-                    vertex.activate();
-                }
+                    if (!msgIterator.hasNext() && vertex.isHalted()) {
+                        return;
+                    }
+                    if (vertex.isHalted()) {
+                        vertex.activate();
+                    }
 
-                try {
-                    vertex.open();
-                    vertex.compute(msgIterator);
-                    vertex.close();
-                    vertex.finishCompute();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    try {
+                        vertex.open();
+                        vertex.compute(msgIterator);
+                        vertex.close();
+                        vertex.finishCompute();
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                } else {
+                    vertex.voteToHalt();
                 }
 
                 /**