add insert/delete vertex support

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2697 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index f311885..b7f9e3d 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -442,6 +442,25 @@
     }
 
     /**
+     * Add a new vertex into the graph
+     * 
+     * @param vertexId the vertex id
+     * @param vertex the vertex
+     */
+    public final void addVertex(I vertexId, V vertex) {
+        delegate.addVertex(vertexId, vertex);
+    }
+
+    /**
+     * Delete a vertex from id
+     * 
+     * @param vertexId  the vertex id
+     */
+    public final void deleteVertex(I vertexId) {
+        delegate.deleteVertex(vertexId);
+    }
+
+    /**
      * Allocate a vertex value from the object pool
      * 
      * @return a vertex value instance
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 7267f30..d949bc5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -44,6 +44,16 @@
     private IFrameWriter aliveWriter;
     private FrameTupleAppender appenderAlive;
 
+    /** the tuple for insert */
+    private ArrayTupleBuilder insertTb;
+    private IFrameWriter insertWriter;
+    private FrameTupleAppender appenderInsert;
+
+    /** the tuple for insert */
+    private ArrayTupleBuilder deleteTb;
+    private IFrameWriter deleteWriter;
+    private FrameTupleAppender appenderDelete;
+
     /** message list */
     private MsgList dummyMessageList = new MsgList();
     /** whether alive message should be pushed out */
@@ -95,25 +105,57 @@
         this.vertexId = vertexId;
     }
 
+    public final void addVertex(I vertexId, V vertex) {
+        try {
+            insertTb.reset();
+            DataOutput outputInsert = insertTb.getDataOutput();
+            vertexId.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            vertex.write(outputInsert);
+            insertTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public final void deleteVertex(I vertexId) {
+        try {
+            deleteTb.reset();
+            DataOutput outputDelete = deleteTb.getDataOutput();
+            vertexId.write(outputDelete);
+            deleteTb.addFieldEndOffset();
+            FrameTupleUtils.flushTuple(appenderDelete, deleteTb, deleteWriter);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
     public final void setOutputWriters(List<IFrameWriter> outputs) {
         msgWriter = outputs.get(0);
-        if (outputs.size() > 1) {
-            aliveWriter = outputs.get(1);
+        insertWriter = outputs.get(1);
+        deleteWriter = outputs.get(2);
+        if (outputs.size() > 3) {
+            aliveWriter = outputs.get(outputs.size() - 1);
             pushAlive = true;
         }
     }
 
     public final void setOutputAppenders(List<FrameTupleAppender> appenders) {
         appenderMsg = appenders.get(0);
-        if (appenders.size() > 1) {
-            appenderAlive = appenders.get(1);
+        appenderInsert = appenders.get(1);
+        appenderDelete = appenders.get(2);
+        if (appenders.size() > 3) {
+            appenderAlive = appenders.get(appenders.size() - 1);
         }
     }
 
     public final void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
         message = tbs.get(0);
-        if (tbs.size() > 1) {
-            alive = tbs.get(1);
+        insertTb = tbs.get(1);
+        deleteTb = tbs.get(2);
+        if (tbs.size() > 3) {
+            alive = tbs.get(tbs.size() - 1);
         }
     }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 04a6e42..727e7fe 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -39,6 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -127,13 +130,16 @@
                 MsgList.class.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 6,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdFinal);
+                rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -212,9 +218,36 @@
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -223,7 +256,18 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -235,6 +279,8 @@
         spec.addRoot(btreeBulkLoad);
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -261,6 +307,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -316,8 +365,8 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -395,6 +444,32 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -406,10 +481,18 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
-
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -420,6 +503,8 @@
         spec.addRoot(emptySink);
         spec.addRoot(btreeBulkLoad);
         spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 35f5e61..9bad169 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -39,6 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -121,13 +124,16 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -146,7 +152,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -156,8 +163,8 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -193,6 +200,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -203,6 +237,20 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        /**
+         * connect the group-by operator
+         */
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -213,6 +261,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -239,6 +289,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -286,9 +339,9 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -306,7 +359,8 @@
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -314,8 +368,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -351,6 +405,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -364,6 +445,15 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 328d7d1..ffdef10 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -38,6 +38,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -123,10 +126,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
@@ -185,6 +192,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -196,6 +230,17 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -204,6 +249,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -230,6 +277,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -277,7 +327,7 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -333,6 +383,31 @@
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+        
+
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
@@ -347,6 +422,16 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+        
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 812e697..cc12523 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -38,6 +38,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -119,10 +122,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
@@ -198,6 +205,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -208,6 +242,16 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -218,6 +262,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
         return spec;
@@ -243,6 +289,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -290,7 +339,7 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -363,6 +412,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -376,6 +452,14 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -386,6 +470,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
         return spec;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index ed04746..2a2e2bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -51,7 +51,8 @@
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 10;
+        ccConfig.jobHistorySize = 0;
+        ccConfig.profileDumpPeriod = -1;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 5d8eaf6..1b8fce4 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -59,6 +59,8 @@
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
             private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -83,6 +85,16 @@
             private ByteBuffer bufferGlobalAggregate;
             private GlobalAggregator aggregator;
 
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
+
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
+
             private Vertex vertex;
             private ResetableByteArrayOutputStream bbos = new ResetableByteArrayOutputStream();
             private DataOutput output = new DataOutputStream(bbos);
@@ -117,8 +129,22 @@
                 this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                if (writers.length > 3) {
-                    this.writerAlive = writers[3];
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
+
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
+
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -128,6 +154,8 @@
                 }
 
                 tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
                 tbs.add(tbAlive);
             }
 
@@ -167,6 +195,9 @@
             @Override
             public void close() throws HyracksDataException {
                 FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
                 if (pushAlive)
                     FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
                 if (!terminate) {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index eb08b51..a4d54c8 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -59,6 +59,8 @@
             private final ArrayTupleBuilder tbAlive = new ArrayTupleBuilder(2);
             private final ArrayTupleBuilder tbTerminate = new ArrayTupleBuilder(1);
             private final ArrayTupleBuilder tbGlobalAggregate = new ArrayTupleBuilder(1);
+            private final ArrayTupleBuilder tbInsert = new ArrayTupleBuilder(2);
+            private final ArrayTupleBuilder tbDelete = new ArrayTupleBuilder(1);
 
             // for writing out to message channel
             private IFrameWriter writerMsg;
@@ -83,6 +85,16 @@
             private ByteBuffer bufferTerminate;
             private boolean terminate = true;
 
+            // for writing out to insert vertex channel
+            private IFrameWriter writerInsert;
+            private FrameTupleAppender appenderInsert;
+            private ByteBuffer bufferInsert;
+
+            // for writing out to delete vertex channel
+            private IFrameWriter writerDelete;
+            private FrameTupleAppender appenderDelete;
+            private ByteBuffer bufferDelete;
+
             // dummy empty msgList
             private MsgList msgList = new MsgList();
             private ArrayIterator msgIterator = new ArrayIterator();
@@ -120,8 +132,22 @@
                 this.appenderGlobalAggregate = new FrameTupleAppender(ctx.getFrameSize());
                 this.appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
 
-                if (writers.length > 3) {
-                    this.writerAlive = writers[3];
+                this.writerInsert = writers[3];
+                this.bufferInsert = ctx.allocateFrame();
+                this.appenderInsert = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderInsert.reset(bufferInsert, true);
+                this.writers.add(writerInsert);
+                this.appenders.add(appenderInsert);
+
+                this.writerDelete = writers[4];
+                this.bufferDelete = ctx.allocateFrame();
+                this.appenderDelete = new FrameTupleAppender(ctx.getFrameSize());
+                this.appenderDelete.reset(bufferDelete, true);
+                this.writers.add(writerDelete);
+                this.appenders.add(appenderDelete);
+
+                if (writers.length > 5) {
+                    this.writerAlive = writers[5];
                     this.bufferAlive = ctx.allocateFrame();
                     this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
                     this.appenderAlive.reset(bufferAlive, true);
@@ -132,6 +158,8 @@
                 msgList.reset(msgIterator);
 
                 tbs.add(tbMsg);
+                tbs.add(tbInsert);
+                tbs.add(tbDelete);
                 tbs.add(tbAlive);
             }
 
@@ -171,6 +199,9 @@
             @Override
             public void close() throws HyracksDataException {
                 FrameTupleUtils.flushTuplesFinal(appenderMsg, writerMsg);
+                FrameTupleUtils.flushTuplesFinal(appenderInsert, writerInsert);
+                FrameTupleUtils.flushTuplesFinal(appenderDelete, writerDelete);
+
                 if (pushAlive)
                     FrameTupleUtils.flushTuplesFinal(appenderAlive, writerAlive);
                 if (!terminate) {