Merged hyracks_asterix_stabilization upto rev 1913

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1924 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 5a6d8cd..b6e8c72 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -44,15 +44,12 @@
 import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will insert tuples into the primary and secondary index using an insert pipeline
@@ -62,7 +59,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -91,8 +88,7 @@
         JobSpecification job = createJob(options);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job);
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob(options.app, job);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -144,11 +140,6 @@
         IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
         primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
-        // create factories and providers for secondary B-Tree
-        TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-        ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(primaryTupleWriterFactory);
-        ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
-
         // the B-Tree expects its keyfields to be at the front of its input
         // tuple
         int[] primaryFieldPermutation = { 2, 1, 3, 4 }; // map field 2 of input
@@ -160,9 +151,9 @@
 
         // create operator descriptor
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider,
-                primaryInteriorFrameFactory, primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories,
-                primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                spec, recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory, null,
+                NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
 
         // prepare insertion into secondary index
@@ -177,12 +168,6 @@
         secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
         secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
-        // create factories and providers for secondary B-Tree
-        TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
-        ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-                secondaryTupleWriterFactory);
-        ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
-
         // the B-Tree expects its keyfields to be at the front of its input
         // tuple
         int[] secondaryFieldPermutation = { 1, 2 };
@@ -190,9 +175,9 @@
                 options.secondaryBTreeName);
         // create operator descriptor
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider,
-                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
-                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                spec, recDesc, storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory, null,
+                NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
 
         // end the insert pipeline at this sink operator
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 0b1cd6c..a6c7ea6 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -43,14 +43,11 @@
 import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will load a primary index from randomly generated data
@@ -60,7 +57,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -89,8 +86,7 @@
         JobSpecification job = createJob(options);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job);
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob(options.app, job);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -144,10 +140,7 @@
         typeTraits[2] = IntegerPointable.TYPE_TRAITS;
         typeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
-        // create factories and providers for B-Tree
-        TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(typeTraits);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory);
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(tupleWriterFactory);
+        // create providers for B-Tree
         IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
         IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
 
@@ -159,8 +152,8 @@
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, btreeSplitProvider, interiorFrameFactory, leafFrameFactory,
-                typeTraits, comparatorFactories, fieldPermutation, 0.7f, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // distribute the records from the datagen via hashing to the bulk load
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index aa73a62..d24ba33 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -42,13 +42,10 @@
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will perform an ordered scan on the primary index
@@ -59,7 +56,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -82,8 +79,7 @@
         JobSpecification job = createJob(options);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job);
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob(options.app, job);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -106,10 +102,7 @@
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
         comparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
-        // create factories and providers for B-Tree
-        TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(typeTraits);
-        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory);
-        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(tupleWriterFactory);
+        // create roviders for B-Tree
         IIndexRegistryProvider<IIndex> indexRegistryProvider = IndexRegistryProvider.INSTANCE;
         IStorageManagerInterface storageManager = StorageManagerInterface.INSTANCE;
 
@@ -149,8 +142,8 @@
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(spec, recDesc, storageManager,
-                indexRegistryProvider, btreeSplitProvider, interiorFrameFactory, leafFrameFactory, typeTraits,
-                comparatorFactories, true, lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
+                indexRegistryProvider, btreeSplitProvider, typeTraits, comparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeSearchOp, splitNCs);
 
         // have each node print the results of its respective B-Tree
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index ad9ad3a..5aa338a 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -36,15 +36,12 @@
 import edu.uci.ics.hyracks.examples.btree.helper.IndexRegistryProvider;
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDiskOrderScanOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will load a secondary index with <key, primary-index key> pairs
@@ -55,7 +52,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -84,8 +81,7 @@
         JobSpecification job = createJob(options);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job);
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob(options.app, job);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -122,17 +118,12 @@
         comparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
         comparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
-        // create factories and providers for primary B-Tree
-        TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-        ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(primaryTupleWriterFactory);
-        ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
-
         // use a disk-order scan to read primary index
         IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         TreeIndexDiskOrderScanOperatorDescriptor btreeScanOp = new TreeIndexDiskOrderScanOperatorDescriptor(spec,
-                recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, dataflowHelperFactory);
+                recDesc, storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeScanOp, splitNCs);
 
         // sort the tuples as preparation for bulk load into secondary index
@@ -148,20 +139,13 @@
         secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
         secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
 
-        // create factories and providers for secondary B-Tree
-        TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
-        ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-                secondaryTupleWriterFactory);
-        ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
-
         // the B-Tree expects its keyfields to be at the front of its input
         // tuple
         int[] fieldPermutation = { 1, 0 };
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, btreeSplitProvider, secondaryInteriorFrameFactory,
-                secondaryLeafFrameFactory, secondaryTypeTraits, comparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories,
+                fieldPermutation, 0.7f, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // connect the ops
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 2719397..277668b 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -42,13 +42,10 @@
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will perform range search on the secondary index
@@ -59,7 +56,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -85,8 +82,7 @@
         JobSpecification job = createJob(options);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job);
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob(options.app, job);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -119,12 +115,6 @@
         IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
         primaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
-        // create factories and providers for secondary B-Tree
-        TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
-        ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-                secondaryTupleWriterFactory);
-        ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
-
         // schema of tuples coming out of primary index
         RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -137,11 +127,6 @@
         primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
         primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
-        // create factories and providers for secondary B-Tree
-        TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-        ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(primaryTupleWriterFactory);
-        ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
-
         // comparators for btree, note that we only need a comparator for the
         // non-unique key
         // i.e. we will have a range condition on the first field only (implying
@@ -183,9 +168,9 @@
                 options.secondaryBTreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
         BTreeSearchOperatorDescriptor secondarySearchOp = new BTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
-                storageManager, indexRegistryProvider, secondarySplitProvider, secondaryInteriorFrameFactory,
-                secondaryLeafFrameFactory, secondaryTypeTraits, searchComparatorFactories, true, secondaryLowKeyFields,
-                secondaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, secondarySplitProvider, secondaryTypeTraits,
+                searchComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, secondarySearchOp, splitNCs);
 
         // secondary index will output tuples with [UTF8String, Integer]
@@ -199,9 +184,9 @@
 
         IFileSplitProvider primarySplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.primaryBTreeName);
         BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primarySplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, primaryLowKeyFields,
-                primaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryLowKeyFields, primaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         JobHelper.createPartitionConstraint(spec, primarySearchOp, splitNCs);
 
         // have each node print the results of its respective B-Tree
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index 36e315d..f342d23 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -45,7 +45,7 @@
     private final int uniqueField;
     private final long randomSeed;
 
-    public DataGenOperatorDescriptor(JobSpecification spec, RecordDescriptor outputRecord, int numRecords,
+    public DataGenOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outputRecord, int numRecords,
             int uniqueField, int intMinVal, int intMaxVal, int maxStrLen, long randomSeed) {
         super(spec, 0, 1);
         this.numRecords = numRecords;
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 7c90618..afe91d1 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -76,11 +76,12 @@
       </plugin>
       <plugin>
       	<groupId>edu.uci.ics.hyracks</groupId>
-      	<artifactId>hyracks-maven-plugin</artifactId>
-      	<version>0.0.2</version>
+      	<artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
+      	<version>0.2.1-SNAPSHOT</version>
         <configuration>
           <hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
           <hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+          <jvmOptions>${jvm.extraargs}</jvmOptions>
         </configuration>
         <executions>
           <execution>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
index 6ef1740..8482083 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
@@ -49,14 +49,12 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
@@ -80,10 +78,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
     private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
@@ -107,9 +101,20 @@
         primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
         primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -138,9 +143,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -175,9 +179,8 @@
         int[] highKeyFields = null; // + infinity
 
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
index 51dbb69..82fecbe 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
@@ -49,14 +49,12 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
@@ -80,10 +78,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
     private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
@@ -107,9 +101,20 @@
         primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
         primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -138,9 +143,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -180,9 +184,8 @@
         int[] highKeyFields = { 1 };
 
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
index a804bae..e63ce11 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
@@ -45,15 +45,13 @@
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
@@ -77,10 +75,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
     private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
@@ -88,10 +82,6 @@
     private IFileSplitProvider primaryBtreeSplitProvider = new ConstantFileSplitProvider(
             new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
 
-    private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
     @Before
     public void setup() throws Exception {
@@ -104,9 +94,20 @@
         primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
         primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -135,9 +136,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -153,8 +153,8 @@
         JobSpecification spec = new JobSpecification();
 
         TreeIndexStatsOperatorDescriptor primaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
-                indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory, primaryLeafFrameFactory,
-                primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory);
+                indexRegistryProvider, primaryBtreeSplitProvider,
+                primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryStatsOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
                 createTempFile().getAbsolutePath()) });
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
index a16bbe4..3c87ae3 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
@@ -50,16 +50,14 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
@@ -83,10 +81,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
     private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
@@ -104,11 +98,6 @@
     private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 2;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-            secondaryTypeTraits);
-    private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            secondaryTupleWriterFactory);
-    private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
 
     private static String secondaryBtreeName = "secondary" + simpleDateFormat.format(new Date());
     private static String secondaryFileName = System.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
@@ -136,11 +125,23 @@
         secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
         secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
+        createSecondaryIndex();
         loadSecondaryIndexTest();
         insertPipelineTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -169,9 +170,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -182,6 +182,16 @@
         runTest(spec);
     }
 
+    public void createSecondaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor secondaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, secondaryBtreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
+        spec.addRoot(secondaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadSecondaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -206,9 +216,8 @@
 
         // scan primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         // sort based on secondary keys
@@ -220,9 +229,8 @@
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
-                secondaryLeafFrameFactory, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
@@ -259,16 +267,16 @@
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryBtreeInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, primaryBtreeSplitProvider,
-                primaryInteriorFrameFactory, primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories,
-                primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory);
+                primaryTypeTraits, primaryComparatorFactories,
+                primaryFieldPermutation, IndexOp.INSERT, dataflowHelperFactory, null, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeInsertOp, NC1_ID);
 
         // first secondary index
         int[] fieldPermutationB = { 4, 0 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
                 spec, ordersDesc, storageManager, indexRegistryProvider, secondaryBtreeSplitProvider,
-                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
-                secondaryComparatorFactories, fieldPermutationB, IndexOp.INSERT, dataflowHelperFactory);
+                secondaryTypeTraits,
+                secondaryComparatorFactories, fieldPermutationB, IndexOp.INSERT, dataflowHelperFactory, null, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
@@ -315,9 +323,9 @@
         // search secondary index
         BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
                 secondaryRecDesc, storageManager, indexRegistryProvider, secondaryBtreeSplitProvider,
-                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
-                secondaryComparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-                dataflowHelperFactory);
+                secondaryTypeTraits,
+                secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
         // second field from the tuples coming from secondary index
@@ -327,9 +335,8 @@
 
         // search primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, primaryLowKeyFields,
-                primaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
+                primaryHighKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
index ef1c6f6..1304f12 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
@@ -49,14 +49,12 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
@@ -80,10 +78,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
     private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
@@ -101,11 +95,6 @@
     private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 2;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-            secondaryTypeTraits);
-    private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            secondaryTupleWriterFactory);
-    private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
 
     private static String secondaryBtreeName = "secondary" + simpleDateFormat.format(new Date());
     private static String secondaryFileName = System.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
@@ -133,10 +122,22 @@
         secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
         secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
+        createSecondaryIndex();
         loadSecondaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -165,9 +166,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -178,6 +178,16 @@
         runTest(spec);
     }
 
+    public void createSecondaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor secondaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, secondaryBtreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryCreateOp, NC1_ID);
+        spec.addRoot(secondaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadSecondaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -202,9 +212,8 @@
 
         // scan primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
-                highKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         // sort based on secondary keys
@@ -216,9 +225,8 @@
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
-                secondaryLeafFrameFactory, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
@@ -260,9 +268,8 @@
         // search secondary index
         BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
                 secondaryRecDesc, storageManager, indexRegistryProvider, secondaryBtreeSplitProvider,
-                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
-                secondaryComparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-                dataflowHelperFactory);
+                secondaryTypeTraits, secondaryComparatorFactories, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
         int[] primaryLowKeyFields = { 1 }; // second field from the tuples
@@ -272,9 +279,8 @@
 
         // search primary index
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, primaryLowKeyFields,
-                primaryHighKeyFields, true, true, dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, primaryLowKeyFields,
+                primaryHighKeyFields, true, true, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 4ed8361..023bdd9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -108,11 +108,10 @@
     }
 
     protected void runTest(JobSpecification spec) throws Exception {
-        JobId jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().toString(2));
         }
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jobId.toString());
         }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index dec42c1..e0b8c73 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -105,11 +105,10 @@
     }
 
     protected void runTest(JobSpecification spec) throws Exception {
-        JobId jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().toString(2));
         }
-        hcc.start(jobId);
+        JobId jobId = hcc.startJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jobId.toString());
         }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 53a3f79..93e1e9b 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -49,11 +50,8 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldGroupAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.AvgFieldMergeAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
@@ -61,6 +59,9 @@
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 
 /**
  *
@@ -107,7 +108,7 @@
                     UTF8StringParserFactory.INSTANCE, }, '|');
 
     private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, String prefix) throws IOException {
+            IOperatorDescriptorRegistry spec, String prefix) throws IOException {
 
         AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
                 spec, new ConstantFileSplitProvider(new FileSplit[] {
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index ce72ec5..aea6126 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -46,9 +46,9 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index a7a0a73..0d5a627 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
@@ -52,11 +53,11 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
 
 public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
 
@@ -86,8 +87,7 @@
 
     /**
      * Test of aggregations using locality aware connector. The output two files should be the
-     * same, each of which is the aggregation of two copies of the lineitem.tbl. 
-     * 
+     * same, each of which is the aggregation of two copies of the lineitem.tbl.
      * Note that if the hashing connector is not working correctly, the two files may be different. This
      * also means that even the output files are the same, the hashing may have other problems.
      * 
@@ -98,66 +98,47 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+                "asterix-003", "asterix-004");
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                        .of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true),
-                                new FloatSumFieldAggregatorFactory(5, true) }),
-                outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+                        new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                "asterix-005", "asterix-006");
-        
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
+
         BitSet nodemap = new BitSet(8);
-        
+
         nodemap.set(0);
         nodemap.set(2);
         nodemap.set(5);
         nodemap.set(7);
 
-        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                        keyFields,
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
-                
-                new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "localityAwareSumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -165,11 +146,11 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
+
     /**
      * Test for locality aware connector, using the global hashing node mapper. This should have
      * the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
-     *  
+     * 
      * @throws Exception
      */
     @Test
@@ -177,59 +158,40 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+                "asterix-003", "asterix-004");
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }),
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
-                        .of(UTF8StringPointable.FACTORY) },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true),
-                                new FloatSumFieldAggregatorFactory(5, true) }),
-                outputRec, tableSize);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+                        new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
 
-        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                        keyFields,
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                                 .of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
-                
-                new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "localityAwareSumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                "asterix-005", "asterix-006");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -237,16 +199,14 @@
         spec.addRoot(printer);
         runTest(spec);
     }
-    
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, String prefix) throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-                spec, new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit("asterix-005", createTempFile()
-                                .getAbsolutePath()),
-                        new FileSplit("asterix-006", createTempFile()
-                                .getAbsolutePath()) }), "\t");
+    private AbstractSingleActivityOperatorDescriptor getPrinter(IOperatorDescriptorRegistry spec, String prefix)
+            throws IOException {
+
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit("asterix-005", createTempFile().getAbsolutePath()),
+                        new FileSplit("asterix-006", createTempFile().getAbsolutePath()) }), "\t");
 
         return printer;
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index c2d3ebf..1ee4400 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -39,8 +40,8 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
 
 public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -74,7 +75,9 @@
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -125,7 +128,9 @@
         LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 6411390..1e60372 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -60,7 +61,7 @@
         }
 
         @Override
-        public ITuplePairComparator createTuplePairComparator() {
+        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
             return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
         }
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/BinaryTokenizerOperatorTest.java
index 76a60ae..836e72e 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/BinaryTokenizerOperatorTest.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
index 298c146..d8fd48e 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
@@ -20,6 +20,7 @@
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,17 +53,16 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifierFactory;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.invertedindex.searchmodifiers.ConjunctiveSearchModifierFactory;
 import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
@@ -86,10 +86,10 @@
 
     private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     private final static String sep = System.getProperty("file.separator");
-    private final String dateString = simpleDateFormat.format(new Date());
-    private final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
-    private final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
-    private final String invListsFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexLists" + dateString;
+    private final static String dateString = simpleDateFormat.format(new Date());
+    private final static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
+    private final static String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+    private final static String invListsFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexLists" + dateString;
 
     private IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
             new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
@@ -103,10 +103,6 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
     private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
             IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
@@ -143,11 +139,23 @@
         invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
         invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
+        createPrimaryIndex();
         loadPrimaryIndex();
         printPrimaryIndex();
+        createInvertedIndex();
         loadInvertedIndex();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     @Test
     public void testConjunctiveSearcher() throws Exception {
         IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
@@ -172,9 +180,8 @@
     private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
         int[] fieldPermutation = { 0, 1 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                btreeDataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
         return primaryBtreeBulkLoad;
     }
@@ -199,9 +206,8 @@
         int[] lowKeyFields = null; // - infinity
         int[] highKeyFields = null; // + infinity
         BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
-                highKeyFields, true, true, btreeDataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories, lowKeyFields,
+                highKeyFields, true, true, btreeDataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
         return primaryBtreeSearchOp;
     }
@@ -250,11 +256,24 @@
         InvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new InvertedIndexBulkLoadOperatorDescriptor(spec,
                 fieldPermutation, storageManager, btreeFileSplitProvider, invListsFileSplitProvider,
                 indexRegistryProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits,
-                invListsComparatorFactories, btreeDataflowHelperFactory);
+                invListsComparatorFactories, tokenizerFactory, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
         return invIndexBulkLoadOp;
     }
 
+    public void createInvertedIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        InvertedIndexCreateOperatorDescriptor invIndexCreateOp = new InvertedIndexCreateOperatorDescriptor(spec,
+                storageManager, btreeFileSplitProvider, invListsFileSplitProvider,
+                indexRegistryProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits,
+                invListsComparatorFactories, tokenizerFactory, btreeDataflowHelperFactory,
+                NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
+        spec.addRoot(invIndexCreateOp);
+        runTest(spec);
+    }
+    
     public void loadInvertedIndex() throws Exception {
         JobSpecification spec = new JobSpecification();
         IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
@@ -295,7 +314,8 @@
         InvertedIndexSearchOperatorDescriptor invIndexSearchOp = new InvertedIndexSearchOperatorDescriptor(spec, 0,
                 storageManager, btreeFileSplitProvider, invListsFileSplitProvider, indexRegistryProvider,
                 tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                btreeDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc);
+                btreeDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
+                NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
         return invIndexSearchOp;
     }
@@ -313,4 +333,14 @@
         spec.addRoot(printer);
         runTest(spec);
     }
+    
+    @AfterClass
+    public static void cleanup() throws Exception {
+    	File primary = new File(primaryFileName);
+    	File btree = new File(btreeFileName);
+    	File invLists = new File(invListsFileName);
+        primary.deleteOnExit();
+        btree.deleteOnExit();
+        invLists.deleteOnExit();
+    }
 }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
index 0c0df42..6625148 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
@@ -50,16 +50,14 @@
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
@@ -74,7 +72,7 @@
 
     private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
     private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
-    private IIndexDataflowHelperFactory dataflowHelperFactory = new RTreeDataflowHelperFactory();
+    private IIndexDataflowHelperFactory dataflowHelperFactory;
 
     private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     private final static String sep = System.getProperty("file.separator");
@@ -85,23 +83,19 @@
     private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
 
-    private RTreeTypeAwareTupleWriterFactory primaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
-            primaryTypeTraits);
-
     private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
             DoubleSerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
             DoubleSerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
             UTF8StringSerializerDeserializer.INSTANCE });
 
-    private ITreeIndexFrameFactory primaryInteriorFrameFactory;
-    private ITreeIndexFrameFactory primaryLeafFrameFactory;
-
     private static String primaryRTreeName = "primary" + simpleDateFormat.format(new Date());
     private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryRTreeName;
 
     private IFileSplitProvider primaryRTreeSplitProvider = new ConstantFileSplitProvider(
             new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
 
+    private IPrimitiveValueProviderFactory[] primaryValueProviderFactories;
+    
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
@@ -115,16 +109,24 @@
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
-        IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
+        primaryValueProviderFactories = RTreeUtils
                 .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
-
-        primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
-                primaryValueProviderFactories);
-        primaryLeafFrameFactory = new RTreeNSMLeafFrameFactory(primaryTupleWriterFactory, primaryValueProviderFactories);
-
+        dataflowHelperFactory = new RTreeDataflowHelperFactory(primaryValueProviderFactories);
+        
+        createPrimaryIndex();
         loadPrimaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryRTreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -144,9 +146,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 3, 4 };
         TreeIndexBulkLoadOperatorDescriptor primaryRTreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryRTreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), objScanner, 0, primaryRTreeBulkLoad, 0);
@@ -185,9 +186,8 @@
         int[] keyFields = { 0, 1, 2, 3 };
 
         RTreeSearchOperatorDescriptor primaryRTreeSearchOp = new RTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, keyFields,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, keyFields,
+                dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryRTreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
index a4912e8..ef2950e 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
@@ -51,7 +51,9 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexStatsOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
@@ -70,7 +72,7 @@
 
     private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
     private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
-    private IIndexDataflowHelperFactory dataflowHelperFactory = new RTreeDataflowHelperFactory();
+    private IIndexDataflowHelperFactory dataflowHelperFactory;
 
     private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     private final static String sep = System.getProperty("file.separator");
@@ -98,6 +100,8 @@
     private IFileSplitProvider primaryRTreeSplitProvider = new ConstantFileSplitProvider(
             new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
 
+    private IPrimitiveValueProviderFactory[] primaryValueProviderFactories;
+    
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
@@ -111,16 +115,28 @@
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
-        IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
+        primaryValueProviderFactories = RTreeUtils
                 .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
-
+        dataflowHelperFactory = new RTreeDataflowHelperFactory(primaryValueProviderFactories);
+        
         primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
                 primaryValueProviderFactories);
         primaryLeafFrameFactory = new RTreeNSMLeafFrameFactory(primaryTupleWriterFactory, primaryValueProviderFactories);
 
+        createPrimaryIndex();
         loadPrimaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryRTreeSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -140,9 +156,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 3, 4 };
         TreeIndexBulkLoadOperatorDescriptor primaryRTreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryInteriorFrameFactory,
-                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryRTreeSplitProvider, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryRTreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), objScanner, 0, primaryRTreeBulkLoad, 0);
@@ -156,8 +171,8 @@
         JobSpecification spec = new JobSpecification();
 
         TreeIndexStatsOperatorDescriptor primaryStatsOp = new TreeIndexStatsOperatorDescriptor(spec, storageManager,
-                indexRegistryProvider, primaryRTreeSplitProvider, primaryInteriorFrameFactory, primaryLeafFrameFactory,
-                primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory);
+                indexRegistryProvider, primaryRTreeSplitProvider, 
+                primaryTypeTraits, primaryComparatorFactories, dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryStatsOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 025f675..030afcf 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -52,21 +52,15 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.DoublePrimitiveValueProviderFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.test.support.TestIndexRegistryProvider;
@@ -81,7 +75,7 @@
 
     private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
     private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
-    private IIndexDataflowHelperFactory dataflowHelperFactory = new RTreeDataflowHelperFactory();
+    private IIndexDataflowHelperFactory dataflowHelperFactory;
     private IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
 
     private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -92,12 +86,6 @@
     private ITypeTraits[] primaryBTreeTypeTraits = new ITypeTraits[primaryBTreeFieldCount];
     private int primaryBTreeKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryBTreeComparatorFactories = new IBinaryComparatorFactory[primaryBTreeKeyFieldCount];
-    private TypeAwareTupleWriterFactory primaryBTreeTupleWriterFactory = new TypeAwareTupleWriterFactory(
-            primaryBTreeTypeTraits);
-    private ITreeIndexFrameFactory primaryBTreeInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-            primaryBTreeTupleWriterFactory);
-    private ITreeIndexFrameFactory primaryBTreeLeafFrameFactory = new BTreeNSMLeafFrameFactory(
-            primaryBTreeTupleWriterFactory);
 
     private static String primaryBTreeName = "primaryBTree" + simpleDateFormat.format(new Date());
     private static String primaryBTreeFileName = System.getProperty("java.io.tmpdir") + sep + primaryBTreeName;
@@ -117,13 +105,6 @@
     private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 4;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
-    private IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = new IPrimitiveValueProviderFactory[secondaryKeyFieldCount];
-
-    private RTreeTypeAwareTupleWriterFactory secondaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
-            secondaryTypeTraits);
-
-    private ITreeIndexFrameFactory secondaryInteriorFrameFactory;
-    private ITreeIndexFrameFactory secondaryLeafFrameFactory;
 
     private static String secondaryRTreeName = "secondary" + simpleDateFormat.format(new Date());
     private static String secondaryFileName = System.getProperty("java.io.tmpdir") + sep + secondaryRTreeName;
@@ -136,6 +117,8 @@
             DoubleSerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
             UTF8StringSerializerDeserializer.INSTANCE });
 
+    private IPrimitiveValueProviderFactory[] secondaryValueProviderFactories;
+    
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary B-tree index
@@ -161,23 +144,28 @@
         secondaryComparatorFactories[1] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[2] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[3] = secondaryComparatorFactories[0];
-        secondaryValueProviderFactories[0] = DoublePrimitiveValueProviderFactory.INSTANCE;
-        secondaryValueProviderFactories[1] = secondaryValueProviderFactories[0];
-        secondaryValueProviderFactories[2] = secondaryValueProviderFactories[0];
-        secondaryValueProviderFactories[3] = secondaryValueProviderFactories[0];
 
-        IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = RTreeUtils
+        secondaryValueProviderFactories = RTreeUtils
                 .createPrimitiveValueProviderFactories(secondaryComparatorFactories.length, DoublePointable.FACTORY);
 
-        secondaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(secondaryTupleWriterFactory,
-                secondaryValueProviderFactories);
-        secondaryLeafFrameFactory = new RTreeNSMLeafFrameFactory(secondaryTupleWriterFactory,
-                secondaryValueProviderFactories);
-
+        dataflowHelperFactory = new RTreeDataflowHelperFactory(secondaryValueProviderFactories);
+        
+        createPrimaryIndex();
         loadPrimaryBTreeIndexTest();
+        createSecondaryIndex();
         loadSecondaryIndexTest();
     }
 
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, primaryBTreeSplitProvider, primaryBTreeTypeTraits,
+                primaryBTreeComparatorFactories, btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadPrimaryBTreeIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -210,9 +198,8 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         TreeIndexBulkLoadOperatorDescriptor primaryBTreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, primaryBTreeSplitProvider, primaryBTreeInteriorFrameFactory,
-                primaryBTreeLeafFrameFactory, primaryBTreeTypeTraits, primaryBTreeComparatorFactories,
-                fieldPermutation, 0.7f, btreeDataflowHelperFactory);
+                storageManager, indexRegistryProvider, primaryBTreeSplitProvider, primaryBTreeTypeTraits, primaryBTreeComparatorFactories,
+                fieldPermutation, 0.7f, btreeDataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBTreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -223,6 +210,16 @@
         runTest(spec);
     }
 
+    public void createSecondaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                indexRegistryProvider, secondaryRTreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+    
     public void loadSecondaryIndexTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
@@ -248,17 +245,15 @@
         // scan primary index
         BTreeSearchOperatorDescriptor primaryBTreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
                 primaryBTreeRecDesc, storageManager, indexRegistryProvider, primaryBTreeSplitProvider,
-                primaryBTreeInteriorFrameFactory, primaryBTreeLeafFrameFactory, primaryBTreeTypeTraits,
-                primaryBTreeComparatorFactories, true, lowKeyFields, highKeyFields, true, true,
-                btreeDataflowHelperFactory);
+                primaryBTreeTypeTraits, primaryBTreeComparatorFactories, lowKeyFields, highKeyFields, 
+                true, true, btreeDataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBTreeSearchOp, NC1_ID);
 
         // load secondary index
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryRTreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, indexRegistryProvider, secondaryRTreeSplitProvider, secondaryInteriorFrameFactory,
-                secondaryLeafFrameFactory, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
-                dataflowHelperFactory);
+                storageManager, indexRegistryProvider, secondaryRTreeSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryRTreeBulkLoad, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBTreeSearchOp, 0);
@@ -299,8 +294,7 @@
 
         RTreeSearchOperatorDescriptor secondaryRTreeSearchOp = new RTreeSearchOperatorDescriptor(spec,
                 secondaryRecDesc, storageManager, indexRegistryProvider, secondaryRTreeSplitProvider,
-                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
-                secondaryComparatorFactories, keyFields, dataflowHelperFactory);
+                secondaryTypeTraits, secondaryComparatorFactories, keyFields, dataflowHelperFactory, false, NoOpOperationCallbackProvider.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryRTreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textapp/pom.xml
index a9c4d0d..50b492b 100644
--- a/hyracks/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textapp/pom.xml
@@ -76,11 +76,12 @@
       </plugin>
       <plugin>
       	<groupId>edu.uci.ics.hyracks</groupId>
-      	<artifactId>hyracks-maven-plugin</artifactId>
-      	<version>0.0.2</version>
+      	<artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
+      	<version>0.2.1-SNAPSHOT</version>
         <configuration>
           <hyracksServerHome>${basedir}/../../../hyracks-server/target/hyracks-server-${project.version}-binary-assembly</hyracksServerHome>
           <hyracksCLIHome>${basedir}/../../../hyracks-cli/target/hyracks-cli-${project.version}-binary-assembly</hyracksCLIHome>
+          <jvmOptions>${jvm.extraargs}</jvmOptions>
         </configuration>
         <executions>
           <execution>
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 66f7efc..943f232 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -54,12 +54,12 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
 /**
@@ -71,8 +71,8 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -122,8 +122,7 @@
 
             System.out.print(i + "\t" + (System.currentTimeMillis() - start));
             start = System.currentTimeMillis();
-            JobId jobId = hcc.createJob(options.app, job);
-            hcc.start(jobId);
+            JobId jobId = hcc.startJob(options.app, job);
             hcc.waitForCompletion(jobId);
             System.out.println("\t" + (System.currentTimeMillis() - start));
         }
@@ -201,8 +200,8 @@
 
         switch (alg) {
             case 0: // new external hash graph
-                grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(spec, keys,
-                        framesLimit, new IBinaryComparatorFactory[] {
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
+                        keys, framesLimit, new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                         new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
@@ -238,8 +237,8 @@
                         PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
                 spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
 
-                grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(spec, keys,
-                        new IBinaryComparatorFactory[] {
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor(
+                        spec, keys, new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                         new MultiFieldsAggregatorFactory(
@@ -274,8 +273,8 @@
                 spec.connect(scanConn2, fileScanner, 0, grouper, 0);
                 break;
             default:
-                grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(spec, keys,
-                        framesLimit, new IBinaryComparatorFactory[] {
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor(spec,
+                        keys, framesLimit, new IBinaryComparatorFactory[] {
                         // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
                         PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
                         new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
diff --git a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 1708259..31019ab 100644
--- a/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -48,11 +48,11 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import edu.uci.ics.hyracks.examples.text.WordTupleParserFactory;
@@ -62,7 +62,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -101,9 +101,8 @@
                 options.algo, options.htSize, options.sbSize, options.format);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job, options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME)
+        JobId jobId = hcc.startJob(options.app, job, options.runtimeProfiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME)
                 : EnumSet.noneOf(JobFlag.class));
-        hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 72533e7..01ccdef 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -54,10 +55,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -68,7 +69,7 @@
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
 
-        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false)
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)", required = false)
         public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
@@ -126,9 +127,8 @@
                 options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job,
+        JobId jobId = hcc.startJob(options.app, job,
                 options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
-        hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
         System.err.println(start + " " + end + " " + (end - start));
@@ -315,7 +315,7 @@
         }
 
         @Override
-        public ITuplePairComparator createTuplePairComparator() {
+        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
             return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
         }
     }