diff --git a/pregelix-core/pom.xml b/pregelix-core/pom.xml
index 17ea6c3..34faa82 100644
--- a/pregelix-core/pom.xml
+++ b/pregelix-core/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>pregelix-core</artifactId>
 	<packaging>jar</packaging>
@@ -19,6 +20,7 @@
 		<plugins>
 			<plugin>
 				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
 				<executions>
 					<execution>
 						<id>balancer</id>
@@ -38,11 +40,6 @@
 							</includes>
 						</configuration>
 					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
 					<execution>
 						<id>generator</id>
 						<goals>
@@ -64,25 +61,6 @@
 				</executions>
 			</plugin>
 			<plugin>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>patch</id>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<phase>package</phase>
-						<configuration>
-							<classifier>patch</classifier>
-							<finalName>a-hadoop</finalName>
-							<includes>
-								<include>**/org/apache/**</include>
-							</includes>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>
 				<version>2.0.2</version>
@@ -94,6 +72,7 @@
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>
 				<artifactId>appassembler-maven-plugin</artifactId>
+				<version>1.3</version>
 				<executions>
 					<execution>
 						<configuration>
@@ -166,25 +145,6 @@
 							</resources>
 						</configuration>
 					</execution>
-					<execution>
-						<id>copy-hadoop-patch</id>
-						<!-- here the phase you need -->
-						<phase>package</phase>
-						<goals>
-							<goal>copy-resources</goal>
-						</goals>
-						<configuration>
-							<outputDirectory>target/appassembler/lib</outputDirectory>
-							<resources>
-								<resource>
-									<directory>target</directory>
-									<includes>
-										<include>a-hadoop-patch.jar</include>
-									</includes>
-								</resource>
-							</resources>
-						</configuration>
-					</execution>
 				</executions>
 			</plugin>
 			<plugin>
@@ -193,7 +153,8 @@
 				<version>2.7.2</version>
 				<configuration>
 					<forkMode>pertest</forkMode>
-					<argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
+					<argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+						-Dfile.encoding=UTF-8
 						-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
 					<includes>
 						<include>**/*TestSuite.java</include>
@@ -203,6 +164,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -335,7 +297,7 @@
 		<dependency>
 			<groupId>edu.uci.ics.hyracks.examples</groupId>
 			<artifactId>hyracks-integration-tests</artifactId>
-			<version>0.2.1</version>
+			<version>0.2.3-SNAPSHOT</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index de29dbc..0b1be61 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -76,8 +76,8 @@
 import edu.uci.ics.pregelix.core.util.DatatypeHelper;
 import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -89,8 +89,6 @@
     private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
     protected static final int MB = 1048576;
     protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-    protected static final int frameSize = ClusterConfig.getFrameSize();
-    protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
     protected static final int tableSize = 10485767;
     protected static final String PRIMARY_INDEX = "primary";
     protected final Configuration conf;
@@ -98,6 +96,8 @@
     protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+    protected int frameSize = ClusterConfig.getFrameSize();
+    protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
 
     protected static final String SECONDARY_INDEX_ODD = "secondary1";
     protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -107,6 +107,17 @@
         this.giraphJob = job;
         this.initJobConfiguration();
         job.setJobId(jobId);
+
+        // set the frame size to be the one user specified if the user did
+        // specify.
+        int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+        if (specifiedFrameSize > 0) {
+            frameSize = specifiedFrameSize;
+            maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+        }
+        if (maxFrameNumber <= 0) {
+            maxFrameNumber = 1;
+        }
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -189,9 +200,10 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
-                confFactory);
-        ClusterConfig.setLocationConstraint(spec, scanner, splits);
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -203,7 +215,7 @@
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
         ClusterConfig.setLocationConstraint(spec, sorter);
 
@@ -264,9 +276,10 @@
         RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), vertexClass.getName());
         IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+        String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
         VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
-                confFactory);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+                readSchedule, confFactory);
+        ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
          * construct sort operator
@@ -280,7 +293,7 @@
                 .getClass());
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
                 nkmFactory, comparatorFactories, recordDescriptor);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+        ClusterConfig.setLocationConstraint(spec, sorter);
 
         /**
          * construct write file operator
@@ -292,7 +305,7 @@
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -357,7 +370,7 @@
         IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
         IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
                 vertexIdClass.getName(), vertexClass.getName());
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
                 resultFileSplitProvider, preHookFactory, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 00cdf07..727e7fe 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -39,6 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -127,13 +130,16 @@
                 MsgList.class.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 6,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate, rdFinal);
+                rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -173,7 +179,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -212,9 +218,36 @@
         EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -223,7 +256,18 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -235,6 +279,8 @@
         spec.addRoot(btreeBulkLoad);
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -261,6 +307,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -316,8 +365,8 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
-                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
+                new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+                preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -342,7 +391,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -395,6 +444,32 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -406,10 +481,18 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
-
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -420,6 +503,8 @@
         spec.addRoot(emptySink);
         spec.addRoot(btreeBulkLoad);
         spec.addRoot(terminateWriter);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3847aa7..9bad169 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -39,6 +39,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -121,13 +124,16 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -139,14 +145,15 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -156,8 +163,8 @@
          */
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -193,6 +200,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -203,6 +237,20 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+        /**
+         * connect the group-by operator
+         */
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -213,6 +261,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -239,6 +289,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -286,9 +339,9 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -299,14 +352,15 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
         /**
          * construct local pre-clustered group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+        IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+                false);
         PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localGby);
@@ -314,8 +368,8 @@
         /**
          * construct global group-by operator
          */
-        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
-                .getAccumulatingAggregatorFactory(conf, true, true);
+        IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+                true, true);
         PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
                 sortCmpFactories, aggregatorFactoryFinal, rdFinal);
         ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -351,6 +405,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -364,6 +445,15 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec783a7..ffdef10 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -38,6 +38,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -123,10 +126,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
@@ -140,7 +147,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -185,6 +192,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -196,6 +230,17 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -204,6 +249,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
         spec.setFrameSize(frameSize);
@@ -230,6 +277,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -277,7 +327,7 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -290,7 +340,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -333,6 +383,31 @@
         FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+        
+
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+        
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
 
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
@@ -347,6 +422,16 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+        
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index bb939e3..cc12523 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -38,6 +38,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -119,10 +122,14 @@
                 vertexIdClass.getName(), vertexClass.getName());
         RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
                 vertexIdClass.getName(), messageValueClass.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
                 recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
-                new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+                new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, scanner);
@@ -136,7 +143,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -152,7 +159,7 @@
         /**
          * construct global sort operator
          */
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -198,6 +205,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -208,6 +242,16 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+                0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -218,6 +262,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink2);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
         return spec;
@@ -243,6 +289,9 @@
                 .getClass());
         RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
                 MsgList.class.getName());
+        RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+                vertexClass.getName());
+        RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         /**
          * construct empty tuple operator
@@ -290,7 +339,7 @@
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
                 spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
-                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+                keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
                 rdPartialAggregate);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -303,7 +352,7 @@
         IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
         sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
                 .getClass());
-        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, localSort);
 
@@ -319,7 +368,7 @@
         /**
          * construct global sort operator
          */
-        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+        ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
                 nkmFactory, sortCmpFactories, rdUnnestedMessage);
         ClusterConfig.setLocationConstraint(spec, globalSort);
 
@@ -363,6 +412,33 @@
                 configurationFactory, aggRdFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
 
+        /**
+         * add the insert operator to insert vertexes
+         */
+        int[] fieldPermutation = new int[] { 0, 1 };
+        TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, insertOp);
+
+        /**
+         * add the delete operator to delete vertexes
+         */
+        TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+                NoOpOperationCallbackProvider.INSTANCE);
+        ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+        /** construct empty sink operator */
+        EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+        ClusterConfig.setLocationConstraint(spec, emptySink4);
+
         ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
@@ -376,6 +452,14 @@
                 terminateWriter, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
+        /**
+         * connect the insert/delete operator
+         */
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -386,6 +470,8 @@
         spec.addRoot(terminateWriter);
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
+        spec.addRoot(emptySink3);
+        spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
         return spec;
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
 
 public class ClusterConfig {
 
@@ -49,6 +50,7 @@
     private static Properties clusterProperties = new Properties();
     private static Map<String, List<String>> ipToNcMapping;
     private static String[] stores;
+    private static Scheduler hdfsScheduler;
 
     /**
      * let tests set config path to be whatever
@@ -211,6 +213,8 @@
                 NCs[i] = entry.getKey();
                 i++;
             }
+
+            hdfsScheduler = new Scheduler(ipAddress, port);
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -218,4 +222,8 @@
         loadClusterProperties();
         loadStores();
     }
+
+    public static Scheduler getHdfsScheduler() {
+        return hdfsScheduler;
+    }
 }
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index ed04746..2a2e2bf 100644
--- a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -51,7 +51,8 @@
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 10;
+        ccConfig.jobHistorySize = 0;
+        ccConfig.profileDumpPeriod = -1;
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
deleted file mode 100644
index 5efdde8..0000000
--- a/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Names a file or directory in a {@link FileSystem}. Path strings use slash as
- * the directory separator. A path string is absolute if it begins with a slash.
- */
-@SuppressWarnings("rawtypes")
-public class Path implements Comparable, Serializable {
-    private static final long serialVersionUID = 1L;
-    /** The directory separator, a slash. */
-    public static final String SEPARATOR = "/";
-    public static final char SEPARATOR_CHAR = '/';
-
-    public static final String CUR_DIR = ".";
-
-    static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
-
-    private URI uri; // a hierarchical uri
-
-    /** Resolve a child path against a parent path. */
-    public Path(String parent, String child) {
-        this(new Path(parent), new Path(child));
-    }
-
-    /** Resolve a child path against a parent path. */
-    public Path(Path parent, String child) {
-        this(parent, new Path(child));
-    }
-
-    /** Resolve a child path against a parent path. */
-    public Path(String parent, Path child) {
-        this(new Path(parent), child);
-    }
-
-    /** Resolve a child path against a parent path. */
-    public Path(Path parent, Path child) {
-        // Add a slash to parent's path so resolution is compatible with URI's
-        URI parentUri = parent.uri;
-        String parentPath = parentUri.getPath();
-        if (!(parentPath.equals("/") || parentPath.equals("")))
-            try {
-                parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
-                        parentUri.getFragment());
-            } catch (URISyntaxException e) {
-                throw new IllegalArgumentException(e);
-            }
-        URI resolved = parentUri.resolve(child.uri);
-        initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
-                resolved.getFragment());
-    }
-
-    private void checkPathArg(String path) {
-        // disallow construction of a Path from an empty string
-        if (path == null) {
-            throw new IllegalArgumentException("Can not create a Path from a null string");
-        }
-        if (path.length() == 0) {
-            throw new IllegalArgumentException("Can not create a Path from an empty string");
-        }
-    }
-
-    /**
-     * Construct a path from a String. Path strings are URIs, but with unescaped
-     * elements and some additional normalization.
-     */
-    public Path(String pathString) {
-        checkPathArg(pathString);
-
-        // We can't use 'new URI(String)' directly, since it assumes things are
-        // escaped, which we don't require of Paths.
-
-        // add a slash in front of paths with Windows drive letters
-        if (hasWindowsDrive(pathString, false))
-            pathString = "/" + pathString;
-
-        // parse uri components
-        String scheme = null;
-        String authority = null;
-
-        int start = 0;
-
-        // parse uri scheme, if any
-        int colon = pathString.indexOf(':');
-        int slash = pathString.indexOf('/');
-        if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
-                                                                   // scheme
-            scheme = pathString.substring(0, colon);
-            start = colon + 1;
-        }
-
-        // parse uri authority, if any
-        if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
-                                                                                       // authority
-            int nextSlash = pathString.indexOf('/', start + 2);
-            int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
-            authority = pathString.substring(start + 2, authEnd);
-            start = authEnd;
-        }
-
-        // uri path is the rest of the string -- query & fragment not supported
-        String path = pathString.substring(start, pathString.length());
-
-        initialize(scheme, authority, path, null);
-    }
-
-    /** Construct a Path from components. */
-    public Path(String scheme, String authority, String path) {
-        checkPathArg(path);
-        initialize(scheme, authority, path, null);
-    }
-
-    /**
-     * Construct a path from a URI
-     */
-    public Path(URI aUri) {
-        uri = aUri;
-    }
-
-    private void initialize(String scheme, String authority, String path, String fragment) {
-        try {
-            this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException(e);
-        }
-    }
-
-    private String normalizePath(String path) {
-        // remove double slashes & backslashes
-        if (path.indexOf("//") != -1) {
-            path = path.replace("//", "/");
-        }
-        if (path.indexOf("\\") != -1) {
-            path = path.replace("\\", "/");
-        }
-
-        // trim trailing slash from non-root path (ignoring windows drive)
-        int minLength = hasWindowsDrive(path, true) ? 4 : 1;
-        if (path.length() > minLength && path.endsWith("/")) {
-            path = path.substring(0, path.length() - 1);
-        }
-
-        return path;
-    }
-
-    private boolean hasWindowsDrive(String path, boolean slashed) {
-        if (!WINDOWS)
-            return false;
-        int start = slashed ? 1 : 0;
-        return path.length() >= start + 2
-                && (slashed ? path.charAt(0) == '/' : true)
-                && path.charAt(start + 1) == ':'
-                && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
-                        .charAt(start) <= 'z'));
-    }
-
-    /** Convert this to a URI. */
-    public URI toUri() {
-        return uri;
-    }
-
-    /** Return the FileSystem that owns this Path. */
-    public FileSystem getFileSystem(Configuration conf) throws IOException {
-        return FileSystem.get(this.toUri(), conf);
-    }
-
-    /** True if the directory of this path is absolute. */
-    public boolean isAbsolute() {
-        int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
-        return uri.getPath().startsWith(SEPARATOR, start);
-    }
-
-    /** Returns the final component of this path. */
-    public String getName() {
-        String path = uri.getPath();
-        int slash = path.lastIndexOf(SEPARATOR);
-        return path.substring(slash + 1);
-    }
-
-    /** Returns the parent of a path or null if at root. */
-    public Path getParent() {
-        String path = uri.getPath();
-        int lastSlash = path.lastIndexOf('/');
-        int start = hasWindowsDrive(path, true) ? 3 : 0;
-        if ((path.length() == start) || // empty path
-                (lastSlash == start && path.length() == start + 1)) { // at root
-            return null;
-        }
-        String parent;
-        if (lastSlash == -1) {
-            parent = CUR_DIR;
-        } else {
-            int end = hasWindowsDrive(path, true) ? 3 : 0;
-            parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
-        }
-        return new Path(uri.getScheme(), uri.getAuthority(), parent);
-    }
-
-    /** Adds a suffix to the final name in the path. */
-    public Path suffix(String suffix) {
-        return new Path(getParent(), getName() + suffix);
-    }
-
-    public String toString() {
-        // we can't use uri.toString(), which escapes everything, because we
-        // want
-        // illegal characters unescaped in the string, for glob processing, etc.
-        StringBuffer buffer = new StringBuffer();
-        if (uri.getScheme() != null) {
-            buffer.append(uri.getScheme());
-            buffer.append(":");
-        }
-        if (uri.getAuthority() != null) {
-            buffer.append("//");
-            buffer.append(uri.getAuthority());
-        }
-        if (uri.getPath() != null) {
-            String path = uri.getPath();
-            if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
-                                                                         // windows
-                                                                         // drive
-                    uri.getScheme() == null && // but no scheme
-                    uri.getAuthority() == null) // or authority
-                path = path.substring(1); // remove slash before drive
-            buffer.append(path);
-        }
-        if (uri.getFragment() != null) {
-            buffer.append("#");
-            buffer.append(uri.getFragment());
-        }
-        return buffer.toString();
-    }
-
-    public boolean equals(Object o) {
-        if (!(o instanceof Path)) {
-            return false;
-        }
-        Path that = (Path) o;
-        return this.uri.equals(that.uri);
-    }
-
-    public int hashCode() {
-        return uri.hashCode();
-    }
-
-    public int compareTo(Object o) {
-        Path that = (Path) o;
-        return this.uri.compareTo(that.uri);
-    }
-
-    /** Return the number of elements in this path. */
-    public int depth() {
-        String path = uri.getPath();
-        int depth = 0;
-        int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
-        while (slash != -1) {
-            depth++;
-            slash = path.indexOf(SEPARATOR, slash + 1);
-        }
-        return depth;
-    }
-
-    /** Returns a qualified path object. */
-    public Path makeQualified(FileSystem fs) {
-        Path path = this;
-        if (!isAbsolute()) {
-            path = new Path(fs.getWorkingDirectory(), this);
-        }
-
-        URI pathUri = path.toUri();
-        URI fsUri = fs.getUri();
-
-        String scheme = pathUri.getScheme();
-        String authority = pathUri.getAuthority();
-        String fragment = pathUri.getFragment();
-        if (scheme != null && (authority != null || fsUri.getAuthority() == null))
-            return path;
-
-        if (scheme == null) {
-            scheme = fsUri.getScheme();
-        }
-
-        if (authority == null) {
-            authority = fsUri.getAuthority();
-            if (authority == null) {
-                authority = "";
-            }
-        }
-
-        URI newUri = null;
-        try {
-            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException(e);
-        }
-        return new Path(newUri);
-    }
-
-    /** Returns a qualified path object. */
-    public Path makeQualified(URI defaultUri, Path workingDir) {
-        Path path = this;
-        if (!isAbsolute()) {
-            path = new Path(workingDir, this);
-        }
-
-        URI pathUri = path.toUri();
-
-        String scheme = pathUri.getScheme();
-        String authority = pathUri.getAuthority();
-        String fragment = pathUri.getFragment();
-
-        if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
-            return path;
-
-        if (scheme == null) {
-            scheme = defaultUri.getScheme();
-        }
-
-        if (authority == null) {
-            authority = defaultUri.getAuthority();
-            if (authority == null) {
-                authority = "";
-            }
-        }
-
-        URI newUri = null;
-        try {
-            newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
-        } catch (URISyntaxException e) {
-            throw new IllegalArgumentException(e);
-        }
-        return new Path(newUri);
-    }
-}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
deleted file mode 100644
index ac72160..0000000
--- a/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * <code>InputSplit</code> represents the data to be processed by an individual {@link Mapper}.
- * <p>
- * Typically, it presents a byte-oriented view on the input and is the responsibility of {@link RecordReader} of the job to process this and present a record-oriented view.
- * 
- * @see InputFormat
- * @see RecordReader
- */
-public abstract class InputSplit implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Get the size of the split, so that the input splits can be sorted by
-     * size.
-     * 
-     * @return the number of bytes in the split
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public abstract long getLength() throws IOException, InterruptedException;
-
-    /**
-     * Get the list of nodes by name where the data for the split would be
-     * local. The locations do not need to be serialized.
-     * 
-     * @return a new array of the node nodes.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public abstract String[] getLocations() throws IOException, InterruptedException;
-}
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/pregelix b/pregelix-core/src/main/resources/scripts/pregelix
index c3fd27b..6997078 100644
--- a/pregelix-core/src/main/resources/scripts/pregelix
+++ b/pregelix-core/src/main/resources/scripts/pregelix
@@ -91,7 +91,7 @@
   REPO="$BASEDIR"/lib
 fi
 
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$(echo ${REPO}/*.jar | tr ' ' ':'):$1
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$1
 
 # For Cygwin, switch paths to Windows format before running java
 if $cygwin; then
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4dfe57d..97659d4 100644
--- a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -62,7 +62,7 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.core.util.TestUtils;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -195,7 +195,7 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
                 null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -362,7 +362,7 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
                 null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -459,7 +459,7 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
                 null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -566,7 +566,7 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
                 null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
       <value>20</value>
    </property>
    <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
+      <name>mapred.max.split.size</name>
+      <value>4096</value>
    </property>
 
 </configuration>
