support multiple concurrent jobs
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index c0825dd..b7216c8 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -669,7 +669,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                false);
+                false, jobId, lastSuccessfulIteration + 1);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         String checkpointPath = BspUtils.getMessageCheckpointPath(conf, lastSuccessfulIteration);;
@@ -740,7 +740,7 @@
          * construct the materializing write operator
          */
         MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec,
-                recordDescriptor);
+                recordDescriptor, jobId, lastCheckpointedIteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 1208556..8c61e6a 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -217,7 +217,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /**
@@ -338,7 +339,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -421,7 +422,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index fcbf9f2..8446379 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -160,7 +160,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -294,7 +295,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -355,7 +356,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 09c14a0..0259c5c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -155,7 +155,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -285,7 +286,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -338,7 +339,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 379147c..0a38d72 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -168,7 +168,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
@@ -299,7 +300,7 @@
          * construct the materializing write operator
          */
         MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal,
-                true);
+                true, jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materializeRead);
 
         /**
@@ -368,7 +369,8 @@
         /**
          * construct the materializing write operator
          */
-        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+        MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal,
+                jobId, iteration);
         ClusterConfig.setLocationConstraint(spec, materialize);
 
         /** construct runtime hook */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index aabd4ba..70de9ed 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -62,7 +62,7 @@
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
         ccConfig.heartbeatPeriod = 50;
-        ccConfig.maxHeartbeatLapsePeriods = 15;
+        ccConfig.maxHeartbeatLapsePeriods = 10;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index b44b643..a5d2ab7 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -31,11 +31,15 @@
 public class MaterializingReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final boolean removeIterationState;
+    private final String jobId;
+    private final int iteration;
 
     public MaterializingReadOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor,
-            boolean removeIterationState) {
+            boolean removeIterationState, String jobId, int iteration) {
         super(spec, 1, 1);
         this.removeIterationState = removeIterationState;
+        this.jobId = jobId;
+        this.iteration = iteration - 1;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -55,8 +59,8 @@
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 if (!complete) {
-                    MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx,
-                            partition);
+                    MaterializerTaskState state = (MaterializerTaskState) IterationUtils.getIterationState(ctx, jobId,
+                            partition, iteration);
                     RunFileReader in = state.getRunFileWriter().createReader();
                     writer.open();
                     try {
@@ -85,7 +89,7 @@
                  * remove last iteration's state
                  */
                 if (removeIterationState) {
-                    IterationUtils.removeIterationState(ctx, partition);
+                    IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
                 }
                 writer.close();
                 complete = true;
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
index 00dcbd1..921dc40 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingWriteOperatorDescriptor.java
@@ -38,9 +38,14 @@
 public class MaterializingWriteOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
     private final static int MATERIALIZER_ACTIVITY_ID = 0;
+    private final String jobId;
+    private final int iteration;
 
-    public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
+    public MaterializingWriteOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor, String jobId,
+            int iteration) {
         super(spec, 1, 1);
+        this.jobId = jobId;
+        this.iteration = iteration;
         recordDescriptors[0] = recordDescriptor;
     }
 
@@ -69,13 +74,12 @@
                 @Override
                 public void open() throws HyracksDataException {
                     /** remove last iteration's state */
-                    IterationUtils.removeIterationState(ctx, partition);
+                    IterationUtils.removeIterationState(ctx, jobId, partition, iteration);
                     state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
                     INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
                     RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-                    FileReference file = context.createManagedWorkspaceFile(MaterializingWriteOperatorDescriptor.class
-                            .getSimpleName());
+                    FileReference file = context.createManagedWorkspaceFile(jobId);
                     state.setRunFileWriter(new RunFileWriter(file, ctx.getIOManager()));
                     state.getRunFileWriter().open();
                     writer.open();
@@ -92,7 +96,7 @@
                     /**
                      * set iteration state
                      */
-                    IterationUtils.setIterationState(ctx, partition, state);
+                    IterationUtils.setIterationState(ctx, jobId, partition, iteration, state);
                     writer.close();
                 }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
new file mode 100644
index 0000000..15836e6
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/PJobContext.java
@@ -0,0 +1,130 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+
+public class PJobContext {
+    private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
+
+    private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final Map<TaskIterationID, IStateObject> appStateMap = new ConcurrentHashMap<TaskIterationID, IStateObject>();
+    private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+    private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+
+    public void close() throws HyracksDataException {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        appStateMap.clear();
+    }
+
+    public void clearState(String jobId) throws HyracksDataException {
+        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
+            for (FileReference fileRef : entry.getValue())
+                fileRef.delete();
+
+        iterationToFiles.clear();
+        appStateMap.remove(jobId);
+        jobIdToMove.remove(jobId);
+        jobIdToSuperStep.remove(jobId);
+    }
+
+    public Map<TaskIterationID, IStateObject> getAppStateStore() {
+        return appStateMap;
+    }
+
+    public static RuntimeContext get(IHyracksTaskContext ctx) {
+        return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+    }
+
+    public void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration, ClassLoader cl) {
+        Boolean toMove = jobIdToMove.get(jobId);
+        if (toMove == null || toMove == true) {
+            if (jobIdToSuperStep.get(jobId) == null) {
+                if (currentIteration <= 0) {
+                    jobIdToSuperStep.put(jobId, 0L);
+                } else {
+                    jobIdToSuperStep.put(jobId, currentIteration);
+                }
+            }
+
+            long superStep = jobIdToSuperStep.get(jobId);
+            List<FileReference> files = iterationToFiles.remove(superStep - 1);
+            if (files != null) {
+                for (FileReference fileRef : files)
+                    fileRef.delete();
+            }
+
+            setProperties(jobId, numVertices, numEdges, currentIteration, superStep, false, cl);
+        }
+        System.gc();
+    }
+
+    public void recoverVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
+            ClassLoader cl) {
+        if (jobIdToSuperStep.get(jobId) == null) {
+            if (currentIteration <= 0) {
+                jobIdToSuperStep.put(jobId, 0L);
+            } else {
+                jobIdToSuperStep.put(jobId, currentIteration);
+            }
+        }
+
+        long superStep = jobIdToSuperStep.get(jobId);
+        List<FileReference> files = iterationToFiles.remove(superStep - 1);
+        if (files != null) {
+            for (FileReference fileRef : files)
+                fileRef.delete();
+        }
+
+        setProperties(jobId, numVertices, numEdges, currentIteration, superStep, true, cl);
+    }
+
+    public void endSuperStep(String pregelixJobId) {
+        jobIdToMove.put(pregelixJobId, true);
+        LOGGER.info("end iteration " + Vertex.getSuperstep());
+    }
+
+    public Map<Long, List<FileReference>> getIterationToFiles() {
+        return iterationToFiles;
+    }
+
+    private void setProperties(String jobId, long numVertices, long numEdges, long currentIteration, long superStep,
+            boolean toMove, ClassLoader cl) {
+        try {
+            Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
+            Method superStepMethod = vClass.getMethod("setSuperstep", Long.TYPE);
+            Method numVerticesMethod = vClass.getMethod("setNumVertices", Long.TYPE);
+            Method numEdgesMethod = vClass.getMethod("setNumEdges", Long.TYPE);
+
+            if (currentIteration > 0) {
+                //Vertex.setSuperstep(currentIteration);
+                superStepMethod.invoke(null, currentIteration);
+            } else {
+                //Vertex.setSuperstep(++superStep);
+                superStepMethod.invoke(null, ++superStep);
+            }
+            //Vertex.setNumVertices(numVertices);
+            numVerticesMethod.invoke(null, numVertices);
+            //Vertex.setNumEdges(numEdges);
+            numEdgesMethod.invoke(null, numEdges);
+            jobIdToSuperStep.put(jobId, superStep);
+            jobIdToMove.put(jobId, toMove);
+            LOGGER.info("start iteration " + Vertex.getSuperstep());
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index f3f7513..5c44d65 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -20,7 +20,6 @@
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
-import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -48,7 +47,6 @@
 import edu.uci.ics.pregelix.api.graph.Vertex;
 
 public class RuntimeContext implements IWorkspaceFileFactory {
-    private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
 
     private final IIndexLifecycleManager lcManager;
     private final ILocalResourceRepository localResourceRepository;
@@ -57,10 +55,7 @@
     private final List<IVirtualBufferCache> vbcs;
     private final IFileMapManager fileMapManager;
     private final IOManager ioManager;
-    private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
-    private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
-    private final Map<String, Long> jobIdToSuperStep = new ConcurrentHashMap<String, Long>();
-    private final Map<String, Boolean> jobIdToMove = new ConcurrentHashMap<String, Boolean>();
+    private final Map<String, PJobContext> activeJobs = new ConcurrentHashMap<String, PJobContext>();
 
     private final ThreadFactory threadFactory = new ThreadFactory() {
         public Thread newThread(Runnable r) {
@@ -92,27 +87,11 @@
     }
 
     public void close() throws HyracksDataException {
-        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
-            for (FileReference fileRef : entry.getValue())
-                fileRef.delete();
-
-        iterationToFiles.clear();
         bufferCache.close();
-        appStateMap.clear();
-
-        System.gc();
-    }
-
-    public void clearState(String jobId) throws HyracksDataException {
-        for (Entry<Long, List<FileReference>> entry : iterationToFiles.entrySet())
-            for (FileReference fileRef : entry.getValue())
-                fileRef.delete();
-
-        iterationToFiles.clear();
-        appStateMap.clear();
-        jobIdToMove.remove(jobId);
-        jobIdToSuperStep.remove(jobId);
-        System.gc();
+        for (Entry<String, PJobContext> entry : activeJobs.entrySet()) {
+            entry.getValue().close();
+        }
+        activeJobs.clear();
     }
 
     public ILocalResourceRepository getLocalResourceRepository() {
@@ -139,87 +118,55 @@
         return fileMapManager;
     }
 
-    public Map<StateKey, IStateObject> getAppStateStore() {
-        return appStateMap;
+    public Map<TaskIterationID, IStateObject> getAppStateStore(String jobId) {
+        PJobContext activeJob = getActiveJob(jobId);
+        return activeJob.getAppStateStore();
     }
 
     public static RuntimeContext get(IHyracksTaskContext ctx) {
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
 
-    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration) {
-        Boolean toMove = jobIdToMove.get(jobId);
-        if (toMove == null || toMove == true) {
-            if (jobIdToSuperStep.get(jobId) == null) {
-                if (currentIteration <= 0) {
-                    jobIdToSuperStep.put(jobId, 0L);
-                } else {
-                    jobIdToSuperStep.put(jobId, currentIteration);
-                }
-            }
-
-            long superStep = jobIdToSuperStep.get(jobId);
-            List<FileReference> files = iterationToFiles.remove(superStep - 1);
-            if (files != null) {
-                for (FileReference fileRef : files)
-                    fileRef.delete();
-            }
-
-            if (currentIteration > 0) {
-                Vertex.setSuperstep(currentIteration);
-            } else {
-                Vertex.setSuperstep(++superStep);
-            }
-            Vertex.setNumVertices(numVertices);
-            Vertex.setNumEdges(numEdges);
-            jobIdToSuperStep.put(jobId, superStep);
-            jobIdToMove.put(jobId, false);
-            LOGGER.info("start iteration " + Vertex.getSuperstep());
-        }
-        System.gc();
+    public synchronized void setVertexProperties(String jobId, long numVertices, long numEdges, long currentIteration,
+            ClassLoader cl) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.setVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
     }
 
     public synchronized void recoverVertexProperties(String jobId, long numVertices, long numEdges,
-            long currentIteration) {
-        if (jobIdToSuperStep.get(jobId) == null) {
-            if (currentIteration <= 0) {
-                jobIdToSuperStep.put(jobId, 0L);
-            } else {
-                jobIdToSuperStep.put(jobId, currentIteration);
-            }
-        }
-
-        long superStep = jobIdToSuperStep.get(jobId);
-        List<FileReference> files = iterationToFiles.remove(superStep - 1);
-        if (files != null) {
-            for (FileReference fileRef : files)
-                fileRef.delete();
-        }
-
-        if (currentIteration > 0) {
-            Vertex.setSuperstep(currentIteration);
-        } else {
-            Vertex.setSuperstep(++superStep);
-        }
-        Vertex.setNumVertices(numVertices);
-        Vertex.setNumEdges(numEdges);
-        jobIdToSuperStep.put(jobId, superStep);
-        jobIdToMove.put(jobId, true);
-        LOGGER.info("recovered iteration " + Vertex.getSuperstep());
+            long currentIteration, ClassLoader cl) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.recoverVertexProperties(jobId, numVertices, numEdges, currentIteration, cl);
     }
 
-    public synchronized void endSuperStep(String pregelixJobId) {
-        jobIdToMove.put(pregelixJobId, true);
-        LOGGER.info("end iteration " + Vertex.getSuperstep());
+    public synchronized void endSuperStep(String jobId) {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.endSuperStep(jobId);
+    }
+
+    public void clearState(String jobId) throws HyracksDataException {
+        PJobContext activeJob = getActiveJob(jobId);
+        activeJob.clearState(jobId);
+        activeJobs.remove(jobId);
+    }
+
+    private PJobContext getActiveJob(String jobId) {
+        PJobContext activeJob = activeJobs.get(jobId);
+        if (activeJob == null) {
+            activeJob = new PJobContext();
+            activeJobs.put(jobId, activeJob);
+        }
+        return activeJob;
     }
 
     @Override
-    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
-        final FileReference fRef = ioManager.createWorkspaceFile(prefix);
-        List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
+    public FileReference createManagedWorkspaceFile(String jobId) throws HyracksDataException {
+        final FileReference fRef = ioManager.createWorkspaceFile(jobId);
+        PJobContext activeJob = getActiveJob(jobId);
+        List<FileReference> files = activeJob.getIterationToFiles().get(Vertex.getSuperstep());
         if (files == null) {
             files = new ArrayList<FileReference>();
-            iterationToFiles.put(Vertex.getSuperstep(), files);
+            activeJob.getIterationToFiles().put(Vertex.getSuperstep(), files);
         }
         files.add(fRef);
         return fRef;
@@ -229,4 +176,5 @@
     public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
         return ioManager.createWorkspaceFile(prefix);
     }
+
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
similarity index 64%
rename from pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
rename to pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
index cbd90b9..f219ed2 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/StateKey.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskID.java
@@ -12,34 +12,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package edu.uci.ics.pregelix.dataflow.context;
 
-import edu.uci.ics.hyracks.api.job.JobId;
+public class TaskID {
 
-public class StateKey {
-    private final JobId jobId;
-    private final int partition;
+    private String jobId;
+    private int partition;
 
-    public StateKey(JobId jobId, int partition) {
+    public TaskID(String jobId, int partition) {
         this.jobId = jobId;
         this.partition = partition;
     }
 
+    public String getJobId() {
+        return jobId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
     @Override
     public int hashCode() {
-        return jobId.hashCode() * partition;
+        return jobId.hashCode() + partition;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof StateKey))
+        if (!(o instanceof TaskID)) {
             return false;
-        StateKey key = (StateKey) o;
-        return key.jobId.equals(jobId) && key.partition == partition;
+        }
+        TaskID tid = (TaskID) o;
+        return jobId.equals(tid.getJobId()) && partition == tid.getPartition();
     }
 
     @Override
     public String toString() {
-        return jobId.toString() + ":" + partition;
+        return "job:" + jobId + " partition:" + partition;
     }
+
 }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
new file mode 100644
index 0000000..53c6a0c
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/TaskIterationID.java
@@ -0,0 +1,51 @@
+package edu.uci.ics.pregelix.dataflow.context;
+
+public class TaskIterationID {
+
+    private TaskID tid;
+    private int iteration;
+
+    public TaskIterationID(TaskID tid, int iteration) {
+        this.tid = tid;
+        this.iteration = iteration;
+    }
+
+    public TaskIterationID(String jobId, int partition, int iteration) {
+        this.tid = new TaskID(jobId, partition);
+        this.iteration = iteration;
+    }
+
+    public TaskID getTaskID() {
+        return tid;
+    }
+
+    public int getIteration() {
+        return iteration;
+    }
+
+    public TaskIterationID getNextTaskIterationID() {
+        return new TaskIterationID(tid, iteration + 1);
+    }
+
+    public TaskIterationID getPreviousTaskIterationID() {
+        return new TaskIterationID(tid, iteration - 1);
+    }
+
+    @Override
+    public int hashCode() {
+        return tid.hashCode() + iteration;
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof TaskIterationID)) {
+            return false;
+        }
+        TaskIterationID tiid = (TaskIterationID) o;
+        return tid.equals(tiid.getTaskID()) && iteration == tiid.getIteration();
+    }
+
+    @Override
+    public String toString() {
+        return tid.toString() + " iteration:" + iteration;
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
index 02097bf..8052b7c 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/util/IterationUtils.java
@@ -28,73 +28,66 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.api.util.BspUtils;
 import edu.uci.ics.pregelix.api.util.JobStateUtils;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-import edu.uci.ics.pregelix.dataflow.context.StateKey;
+import edu.uci.ics.pregelix.dataflow.context.TaskIterationID;
 
 public class IterationUtils {
     public static final String TMP_DIR = "/tmp/";
 
-    public static void setIterationState(IHyracksTaskContext ctx, int partition, IStateObject state) {
+    public static void setIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration,
+            IStateObject state) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        map.put(new StateKey(ctx.getJobletContext().getJobId(), partition), state);
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        map.put(new TaskIterationID(pregelixJobId, partition, iteration), state);
     }
 
-    public static IStateObject getIterationState(IHyracksTaskContext ctx, int partition) {
-        JobId currentId = ctx.getJobletContext().getJobId();
-        JobId lastId = new JobId(currentId.getId() - 1);
+    public static IStateObject getIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition,
+            int iteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        IStateObject state = map.get(new StateKey(lastId, partition));
-        while (state == null) {
-            /** in case the last job is a checkpointing job */
-            lastId = new JobId(lastId.getId() - 1);
-            state = map.get(new StateKey(lastId, partition));
-        }
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        IStateObject state = map.get(new TaskIterationID(pregelixJobId, partition, iteration));
         return state;
     }
 
-    public static void removeIterationState(IHyracksTaskContext ctx, int partition) {
-        JobId currentId = ctx.getJobletContext().getJobId();
-        JobId lastId = new JobId(currentId.getId() - 1);
+    public static void removeIterationState(IHyracksTaskContext ctx, String pregelixJobId, int partition, int iteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        Map<StateKey, IStateObject> map = context.getAppStateStore();
-        map.remove(new StateKey(lastId, partition));
+        Map<TaskIterationID, IStateObject> map = context.getAppStateStore(pregelixJobId);
+        map.remove(new TaskIterationID(pregelixJobId, partition, iteration));
     }
 
-    public static void endSuperStep(String giraphJobId, IHyracksTaskContext ctx) {
+    public static void endSuperStep(String pregelixJobId, IHyracksTaskContext ctx) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.endSuperStep(giraphJobId);
+        context.endSuperStep(pregelixJobId);
     }
 
-    public static void setProperties(String jobId, IHyracksTaskContext ctx, Configuration conf, long currentIteration) {
-        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
-        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.setVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
-    }
-
-    public static void recoverProperties(String jobId, IHyracksTaskContext ctx, Configuration conf,
+    public static void setProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
             long currentIteration) {
         INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
         RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
-        context.recoverVertexProperties(jobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
-                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration);
+        context.setVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
     }
 
-    public static void writeTerminationState(Configuration conf, String jobId, boolean terminate)
+    public static void recoverProperties(String pregelixJobId, IHyracksTaskContext ctx, Configuration conf,
+            long currentIteration) {
+        INCApplicationContext appContext = ctx.getJobletContext().getApplicationContext();
+        RuntimeContext context = (RuntimeContext) appContext.getApplicationObject();
+        context.recoverVertexProperties(pregelixJobId, conf.getLong(PregelixJob.NUM_VERTICE, -1),
+                conf.getLong(PregelixJob.NUM_EDGES, -1), currentIteration, ctx.getJobletContext().getClassLoader());
+    }
+
+    public static void writeTerminationState(Configuration conf, String pregelixJobId, boolean terminate)
             throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId;
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
             Path path = new Path(pathStr);
             FSDataOutputStream output = dfs.create(path, true);
             output.writeBoolean(terminate);
@@ -105,11 +98,11 @@
         }
     }
 
-    public static void writeGlobalAggregateValue(Configuration conf, String jobId, Writable agg)
+    public static void writeGlobalAggregateValue(Configuration conf, String pregelixJobId, Writable agg)
             throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId + "agg";
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId + "agg";
             Path path = new Path(pathStr);
             FSDataOutputStream output = dfs.create(path, true);
             agg.write(output);
@@ -120,10 +113,10 @@
         }
     }
 
-    public static boolean readTerminationState(Configuration conf, String jobId) throws HyracksDataException {
+    public static boolean readTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
         try {
             FileSystem dfs = FileSystem.get(conf);
-            String pathStr = IterationUtils.TMP_DIR + jobId;
+            String pathStr = IterationUtils.TMP_DIR + pregelixJobId;
             Path path = new Path(pathStr);
             FSDataInputStream input = dfs.open(path);
             boolean terminate = input.readBoolean();
@@ -134,8 +127,8 @@
         }
     }
 
-    public static void writeForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
-        JobStateUtils.writeForceTerminationState(conf, jobId);
+    public static void writeForceTerminationState(Configuration conf, String pregelixJobId) throws HyracksDataException {
+        JobStateUtils.writeForceTerminationState(conf, pregelixJobId);
     }
 
     public static boolean readForceTerminationState(Configuration conf, String jobId) throws HyracksDataException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java
new file mode 100644
index 0000000..3f42c0d
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/MultiJobTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.example;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.ConservativeCheckpointHook;
+import edu.uci.ics.pregelix.api.util.DefaultVertexPartitioner;
+import edu.uci.ics.pregelix.core.driver.Driver;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.example.ConnectedComponentsVertex.SimpleConnectedComponentsVertexOutputFormat;
+import edu.uci.ics.pregelix.example.data.VLongNormalizedKeyComputer;
+import edu.uci.ics.pregelix.example.inputformat.TextConnectedComponentsInputFormat;
+import edu.uci.ics.pregelix.example.util.TestCluster;
+import edu.uci.ics.pregelix.example.util.TestUtils;
+
+/**
+ * This test case tests multi-user workload.
+ * 
+ * @author yingyib
+ */
+public class MultiJobTest {
+    private static String INPUTPATH = "data/webmapcomplex";
+    private static String OUTPUTPAH = "actual/result";
+    private static String OUTPUTPAH2 = "actual/result2";
+    private static String EXPECTEDPATH = "src/test/resources/expected/ConnectedComponentsRealComplex";
+
+    @Test
+    public void test() throws Exception {
+        TestCluster testCluster = new TestCluster();
+        try {
+            PregelixJob job = new PregelixJob(ConnectedComponentsVertex.class.getName());
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexClass(ConnectedComponentsVertex.class);
+            job.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+            job.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+            job.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+            job.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+            job.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+            job.setDynamicVertexValueSize(true);
+            FileInputFormat.setInputPaths(job, INPUTPATH);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUTPAH));
+            job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+            job.setCheckpointHook(ConservativeCheckpointHook.class);
+
+            testCluster.setUp();
+            Thread thread = new Thread(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        Driver driver = new Driver(PageRankVertex.class);
+                        PregelixJob job2 = new PregelixJob(ConnectedComponentsVertex.class.getName());
+                        job2.setVertexClass(ConnectedComponentsVertex.class);
+                        job2.setVertexClass(ConnectedComponentsVertex.class);
+                        job2.setVertexInputFormatClass(TextConnectedComponentsInputFormat.class);
+                        job2.setVertexOutputFormatClass(SimpleConnectedComponentsVertexOutputFormat.class);
+                        job2.setMessageCombinerClass(ConnectedComponentsVertex.SimpleMinCombiner.class);
+                        job2.setNoramlizedKeyComputerClass(VLongNormalizedKeyComputer.class);
+                        job2.setVertexPartitionerClass(DefaultVertexPartitioner.class);
+                        job2.setDynamicVertexValueSize(true);
+                        FileInputFormat.setInputPaths(job2, INPUTPATH);
+                        FileOutputFormat.setOutputPath(job2, new Path(OUTPUTPAH2));
+                        job2.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 23);
+                        job2.setCheckpointHook(ConservativeCheckpointHook.class);
+                        driver.runJob(job2, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+                        TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH2));
+                    } catch (Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            });
+            thread.start();
+            Driver driver = new Driver(PageRankVertex.class);
+            driver.runJob(job, "127.0.0.1", PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT);
+            TestUtils.compareWithResultDir(new File(EXPECTEDPATH), new File(OUTPUTPAH));
+            thread.join();
+        } catch (Exception e) {
+            PregelixHyracksIntegrationUtil.deinit();
+            testCluster.cleanupHDFS();
+            throw e;
+        }
+    }
+}