- Added Tokenize Operator in addition to the bulkload operator changes that were made by Zachary Heilbron.
   The tokenize operator is only added to the logical plan when bulk-loading the data.
 - Each secondary index is now updated in the separate branch by using the replicate operator.
 - Sink Operator now accepts multiple inputs.
 - Fixed the bulk-load so that it correctly produces auto-generated PK.

Change-Id: Ifb591754dba5eb4a9207edaa4e658f4cc745893a
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/78
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index e15ffe0..bface33 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -17,7 +17,6 @@
 public enum LogicalOperatorTag {
     AGGREGATE,
     ASSIGN,
-    CLUSTER,
     DATASOURCESCAN,
     DISTINCT,
     DISTRIBUTE_RESULT,
@@ -46,5 +45,6 @@
     INDEX_INSERT_DELETE,
     UPDATE,
     EXTENSION_OPERATOR,
-    EXTERNAL_LOOKUP
+    EXTERNAL_LOOKUP,
+    TOKENIZE
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c41107b..48f8230 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -19,6 +19,8 @@
     ASSIGN,
     BROADCAST_EXCHANGE,
     BTREE_SEARCH,
+    BULKLOAD,
+    INDEX_BULKLOAD,
     STATS,
     DATASOURCE_SCAN,
     DISTRIBUTE_RESULT,
@@ -62,5 +64,6 @@
     LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
     PARTITIONINGSPLIT,
     EXTENSION_OPERATOR,
-    EXTERNAL_LOOKUP
+    EXTERNAL_LOOKUP,
+    TOKENIZE
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 997053f..129201d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -64,7 +64,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+            JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -74,7 +74,7 @@
     /**
      * Creates the insert runtime of IndexInsertDeletePOperator, which models
      * insert/delete operations into a secondary index.
-     * 
+     *
      * @param dataSource
      *            Target secondary index.
      * @param propagatedSchema
@@ -108,12 +108,12 @@
             IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec) throws AlgebricksException;
+            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException;
 
     /**
      * Creates the delete runtime of IndexInsertDeletePOperator, which models
      * insert/delete operations into a secondary index.
-     * 
+     *
      * @param dataSource
      *            Target secondary index.
      * @param propagatedSchema
@@ -149,6 +149,42 @@
             List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec) throws AlgebricksException;
 
+    /**
+     * Creates the TokenizeOperator for IndexInsertDeletePOperator, which tokenizes
+     * secondary key into [token, number of token] pair in a length-partitioned index.
+     * In case of non length-partitioned index, it tokenizes secondary key into [token].
+     *
+     * @param dataSource
+     *            Target secondary index.
+     * @param propagatedSchema
+     *            Output schema of the insert/delete operator to be created.
+     * @param inputSchemas
+     *            Output schemas of the insert/delete operator to be created.
+     * @param typeEnv
+     *            Type environment of the original IndexInsertDeleteOperator operator.
+     * @param primaryKeys
+     *            Variables for the dataset's primary keys that the dataSource secondary index belongs to.
+     * @param secondaryKeys
+     *            Variables for the secondary-index keys.
+     * @param filterExpr
+     *            Filtering expression to be pushed inside the runtime op.
+     *            Such a filter may, e.g., exclude NULLs from being inserted/deleted.
+     * @param recordDesc
+     *            Output record descriptor of the runtime op to be created.
+     * @param context
+     *            Job generation context.
+     * @param spec
+     *            Target job specification.
+     * @return
+     *         A Hyracks IOperatorDescriptor and its partition constraint.
+     * @throws AlgebricksException
+     */
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+            IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException;
+
     public IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
 
     public IFunctionInfo lookupFunction(FunctionIdentifier fid);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 78efd0d..c6e93fe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
index ea45cc6..c58215e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
@@ -35,19 +36,24 @@
 
     private final IDataSourceIndex<?, ?> dataSourceIndex;
     private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+    // In the bulk-load case on ngram or keyword index,
+    // it contains [token, number of token] or [token].
+    // Otherwise, it contains secondary key information.
     private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
     private final Mutable<ILogicalExpression> filterExpr;
     private final Kind operation;
+    private final boolean bulkload;
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
 
     public IndexInsertDeleteOperator(IDataSourceIndex<?, ?> dataSourceIndex,
             List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
-            Mutable<ILogicalExpression> filterExpr, Kind operation) {
+            Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
         this.dataSourceIndex = dataSourceIndex;
         this.primaryKeyExprs = primaryKeyExprs;
         this.secondaryKeyExprs = secondaryKeyExprs;
         this.filterExpr = filterExpr;
         this.operation = operation;
+        this.bulkload = bulkload;
     }
 
     @Override
@@ -105,6 +111,10 @@
         return dataSourceIndex;
     }
 
+    public String getIndexName() {
+        return dataSourceIndex.getId().toString();
+    }
+
     public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
         return secondaryKeyExprs;
     }
@@ -117,6 +127,10 @@
         return operation;
     }
 
+    public boolean isBulkload() {
+        return bulkload;
+    }
+
     public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
         this.additionalFilteringExpressions = additionalFilteringExpressions;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
index 94fd197..134506c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,17 +38,23 @@
     }
 
     private final IDataSource<?> dataSource;
+
     private final Mutable<ILogicalExpression> payloadExpr;
+
     private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+
     private final Kind operation;
     private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
 
-    public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payload,
-            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation) {
+    private final boolean bulkload;
+
+    public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
         this.dataSource = dataSource;
-        this.payloadExpr = payload;
+        this.payloadExpr = payloadExpr;
         this.primaryKeyExprs = primaryKeyExprs;
         this.operation = operation;
+        this.bulkload = bulkload;
     }
 
     @Override
@@ -58,15 +64,13 @@
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        boolean b = false;
-        b = visitor.transform(payloadExpr);
-        for (int i = 0; i < primaryKeyExprs.size(); i++) {
-            if (visitor.transform(primaryKeyExprs.get(i))) {
-                b = true;
-            }
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+        boolean changed = false;
+        changed = transform.transform(payloadExpr);
+        for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
+            changed |= transform.transform(e);
         }
-        return b;
+        return changed;
     }
 
     @Override
@@ -110,6 +114,10 @@
         return operation;
     }
 
+    public boolean isBulkload() {
+        return bulkload;
+	}
+
     public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
         this.additionalFilteringExpressions = additionalFilteringExpressions;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
index bdad6d4..84ad33b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,8 +15,10 @@
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
@@ -24,6 +26,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -33,7 +36,14 @@
 
     @Override
     public void recomputeSchema() throws AlgebricksException {
-        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+        schema = new ArrayList<LogicalVariable>();
+        for (int i = 0; i < inputs.size(); i++) {
+            for (LogicalVariable v : inputs.get(i).getValue().getSchema()) {
+                if (!schema.contains(v))
+                	schema.add(v);
+            }
+
+        }
     }
 
     @Override
@@ -58,11 +68,14 @@
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        ITypeEnvPointer[] envPointers = new ITypeEnvPointer[1];
-        envPointers[0] = new OpRefTypeEnvPointer(inputs.get(0), ctx);
-        PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
-                ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
-        return env;
+      ITypeEnvPointer[] envPointers = new ITypeEnvPointer[inputs.size()];
+      for (int i = 0; i < inputs.size(); i++) {
+          envPointers[i] = new OpRefTypeEnvPointer(inputs.get(i), ctx);
+      }
+      PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+              ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+      return env;
+
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
new file mode 100644
index 0000000..82c1035
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class TokenizeOperator extends AbstractLogicalOperator {
+
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+    private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
+    // logical variables for token, and number of token
+    private final List<LogicalVariable> tokenizeVars;
+    private final Mutable<ILogicalExpression> filterExpr;
+    private final Kind operation;
+    private final boolean bulkload;
+    private final boolean isPartitioned;
+    // contains the type for each variable in the tokenizeVars
+    private final List<Object> tokenizeVarTypes;
+    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+
+    public TokenizeOperator(IDataSourceIndex<?, ?> dataSourceIndex,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs,
+            List<Mutable<ILogicalExpression>> secondaryKeyExprs,
+            List<LogicalVariable> tokenizeVars,
+            Mutable<ILogicalExpression> filterExpr, Kind operation,
+            boolean bulkload, boolean isPartitioned,
+            List<Object> tokenizeVarTypes) {
+        this.dataSourceIndex = dataSourceIndex;
+        this.primaryKeyExprs = primaryKeyExprs;
+        this.secondaryKeyExprs = secondaryKeyExprs;
+        this.tokenizeVars = tokenizeVars;
+        this.filterExpr = filterExpr;
+        this.operation = operation;
+        this.bulkload = bulkload;
+        this.isPartitioned = isPartitioned;
+        this.tokenizeVarTypes = tokenizeVarTypes;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+        schema.addAll(tokenizeVars);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(
+            ILogicalExpressionReferenceTransform visitor)
+            throws AlgebricksException {
+        boolean b = false;
+        for (int i = 0; i < primaryKeyExprs.size(); i++) {
+            if (visitor.transform(primaryKeyExprs.get(i))) {
+                b = true;
+            }
+        }
+        for (int i = 0; i < secondaryKeyExprs.size(); i++) {
+            if (visitor.transform(secondaryKeyExprs.get(i))) {
+                b = true;
+            }
+        }
+        return b;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg)
+            throws AlgebricksException {
+        return visitor.visitTokenizeOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target,
+                    IOperatorSchema... sources) throws AlgebricksException {
+                target.addAllVariables(sources[0]);
+                for (LogicalVariable v : tokenizeVars) {
+                    target.addVariable(v);
+                }
+            }
+        };
+
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.TOKENIZE;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(
+            ITypingContext ctx) throws AlgebricksException {
+        IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+
+        // If the secondary index is not length-partitioned, create one new
+        // output variable - token.
+        // If the secondary index is length-partitioned, create two new output
+        // variables - token, number of token.
+        // The type of this variable will be the same as the type of the
+        // secondary key. If the secondary is list type, then the element type
+        // of the list.
+        // Along with PK, the tokenizer will generate [token, number of tokens,
+        // PK] pairs.
+
+        for (int i = 0; i < tokenizeVars.size(); i++) {
+            env.setVarType(tokenizeVars.get(i), tokenizeVarTypes.get(i));
+        }
+
+        return env;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+        return primaryKeyExprs;
+    }
+
+    public IDataSourceIndex<?, ?> getDataSourceIndex() {
+        return dataSourceIndex;
+    }
+
+    public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
+        return secondaryKeyExprs;
+    }
+
+    public List<LogicalVariable> getTokenizeVars() {
+        return tokenizeVars;
+    }
+
+    public Mutable<ILogicalExpression> getFilterExpression() {
+        return filterExpr;
+    }
+
+    public Kind getOperation() {
+        return operation;
+    }
+
+    public boolean isBulkload() {
+        return bulkload;
+    }
+
+    public boolean isPartitioned() {
+        return isPartitioned;
+    }
+
+    public List<Object> getTokenizeVarTypes() {
+        return tokenizeVarTypes;
+    }
+
+    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+        this.additionalFilteringExpressions = additionalFilteringExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+        return additionalFilteringExpressions;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index ae0346e..266ead8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -67,6 +67,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -500,6 +501,13 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
         setEmptyFDsEqClasses(op, ctx);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index b56eb8c..804c0e7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -468,6 +469,19 @@
     }
 
     @Override
+    public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.TOKENIZE)
+            return Boolean.FALSE;
+        TokenizeOperator tokenizeOpArg = (TokenizeOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), tokenizeOpArg.getSchema());
+        if (!op.getDataSourceIndex().equals(tokenizeOpArg.getDataSourceIndex()))
+            isomorphic = false;
+        return isomorphic;
+    }
+
+    @Override
     public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
         return true;
     }
@@ -786,7 +800,7 @@
             List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
             InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
-                    deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation());
+                    deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(), op.isBulkload());
             insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
             return insertDeleteOp;
         }
@@ -803,11 +817,33 @@
             List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
             deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
             IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
-                    newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
+                    newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(), op.isBulkload());
             indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
             return indexInsertDeleteOp;
+                }
+
+        @Override
+        public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg)
+                throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
+            List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+            deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
+            List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
+            deepCopyVars(newTokenizeVars, op.getTokenizeVars());
+            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+            List<Object> newTokenizeVarTypes = new ArrayList<Object>();
+            deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
+
+            TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(),
+                    newPrimaryKeyExpressions, newSecondaryKeyExpressions,
+                    newTokenizeVars, newFilterExpression, op.getOperation(),
+                    op.isBulkload(), op.isPartitioned(), newTokenizeVarTypes);
+            return tokenizeOp;
         }
 
+
         @Override
         public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
             return new SinkOperator();
@@ -825,6 +861,18 @@
                     ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
         }
 
+        private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
+            for (LogicalVariable oldVar : oldVars)
+                newVars.add(oldVar);
+            return newVars;
+        }
+
+        private List<Object> deepCopyObjects(List<Object> newObjs, List<Object> oldObjs) {
+            for (Object oldObj : oldObjs)
+                newObjs.add(oldObj);
+            return newObjs;
+        }
+
         private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
                 List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
             List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index ef91b38..ea8e82b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -57,6 +57,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -251,6 +252,13 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 225edd6..388d9f9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,6 +49,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -260,6 +261,13 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 3b8ed56..bf184e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,6 +52,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -244,6 +245,12 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        producedVariables.addAll(op.getTokenizeVars());
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
         return null;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b8cc3b8..b31bc3b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,6 +53,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -282,6 +283,12 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 729a996..b2d6022 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,6 +54,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -400,6 +401,19 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 2bd9205..0209492 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,6 +53,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -357,6 +358,17 @@
     }
 
     @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
     public Void visitSinkOperator(SinkOperator op, Void arg) {
         return null;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 4407b75..66e7b98 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,6 +70,14 @@
         return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
+    protected PhysicalRequirements emptyUnaryRequirements(int numberOfChildren) {
+        StructuralPropertiesVector[] req = new StructuralPropertiesVector[numberOfChildren];
+        for (int i = 0; i < numberOfChildren; i++) {
+            req[i] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+        }
+        return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
     @Override
     public void disableJobGenBelowMe() {
         this.disableJobGenBelow = true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
new file mode 100644
index 0000000..f8fdd46
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class BulkloadPOperator extends AbstractPhysicalOperator {
+
+    private final LogicalVariable payload;
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> additionalFilteringKeys;
+    private final IDataSource<?> dataSource;
+
+    public BulkloadPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+           List<LogicalVariable> additionalFilteringKeys, IDataSource<?> dataSource) {
+        this.payload = payload;
+        this.primaryKeys = keys;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.BULKLOAD;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<>();
+        scanVariables.addAll(primaryKeys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector physicalProps = dataSource.getPropertiesProvider().computePropertiesVector(
+                scanVariables);
+        StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+                physicalProps.getLocalProperties());
+        return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+        assert insertDeleteOp.getOperation() == Kind.INSERT;
+        assert insertDeleteOp.isBulkload();
+
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getInsertRuntime(
+                dataSource, propagatedSchema, typeEnv, primaryKeys, payload, additionalFilteringKeys,
+                inputDesc, context, spec, true);
+        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
new file mode 100644
index 0000000..e0d8d79
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -0,0 +1,126 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final List<LogicalVariable> additionalFilteringKeys;
+    private final Mutable<ILogicalExpression> filterExpr;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+    public IndexBulkloadPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+            IDataSourceIndex<?, ?> dataSourceIndex) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        this.additionalFilteringKeys = additionalFilteringKeys;
+        this.filterExpr = filterExpr;
+        this.dataSourceIndex = dataSourceIndex;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INDEX_BULKLOAD;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<LogicalVariable> scanVariables = new ArrayList<>();
+        scanVariables.addAll(primaryKeys);
+        scanVariables.add(new LogicalVariable(-1));
+        IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+                .computePropertiesVector(scanVariables);
+        List<ILocalStructuralProperty> localProperties = new ArrayList<>();
+        // Data needs to be sorted based on the [token, number of token, PK]
+        // OR [token, PK] if the index is not partitioned
+        for (LogicalVariable skVar : secondaryKeys) {
+            localProperties.add(new LocalOrderProperty(new OrderColumn(skVar,
+                    OrderKind.ASC)));
+        }
+        for (LogicalVariable pkVar : primaryKeys) {
+            localProperties.add(new LocalOrderProperty(new OrderColumn(pkVar,
+                    OrderKind.ASC)));
+        }
+        StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+                localProperties);
+        return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+        assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
+        assert indexInsertDeleteOp.isBulkload();
+
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getIndexInsertRuntime(
+                dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys,
+                additionalFilteringKeys, null, inputDesc, context, spec, true);
+        builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index b56d638..4bc4c06 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -108,7 +108,7 @@
         IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
         if (insertDeleteOp.getOperation() == Kind.INSERT) {
             runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
         } else {
             runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
                     primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index d254151..993e8d7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -97,7 +97,7 @@
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
         if (insertDeleteOp.getOperation() == Kind.INSERT) {
             runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, inputDesc, context, spec);
+                    additionalFilteringKeys, inputDesc, context, spec, false);
         } else {
             runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
                     additionalFilteringKeys, inputDesc, context, spec);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index aa24221..ba299ac 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,13 +14,21 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -28,6 +36,9 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
 
 public class SinkPOperator extends AbstractPhysicalOperator {
 
@@ -38,30 +49,45 @@
 
     @Override
     public boolean isMicroOperator() {
-        return true;
+        return false;
     }
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+        AbstractLogicalOperator op2;
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+        IPhysicalPropertiesVector childsProperties = null;
+
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            op2 = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+            childsProperties = op2.getPhysicalOperator().getDeliveredProperties();
+            if (childsProperties.getLocalProperties() != null) {
+                propsLocal.addAll(childsProperties.getLocalProperties());
+            }
+        }
+
+        deliveredProperties = new StructuralPropertiesVector(childsProperties.getPartitioningProperty(), propsLocal);
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        return emptyUnaryRequirements();
+      return emptyUnaryRequirements(op.getInputs().size());
     }
 
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
-        SinkRuntimeFactory runtime = new SinkRuntimeFactory();
-        builder.contributeMicroOperator(op, runtime, recDesc);
-        ILogicalOperator src = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, op, 0);
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+        SinkOperatorDescriptor opDesc = new SinkOperatorDescriptor(spec, op.getInputs().size());
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+        }
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
new file mode 100644
index 0000000..ef8cd0a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class TokenizePOperator extends AbstractPhysicalOperator {
+
+    private final List<LogicalVariable> primaryKeys;
+    private final List<LogicalVariable> secondaryKeys;
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+    public TokenizePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            IDataSourceIndex<?, ?> dataSourceIndex) {
+        this.primaryKeys = primaryKeys;
+        this.secondaryKeys = secondaryKeys;
+        this.dataSourceIndex = dataSourceIndex;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.TOKENIZE;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        TokenizeOperator tokenizeOp = (TokenizeOperator) op;
+        if (tokenizeOp.getOperation() != Kind.INSERT || !tokenizeOp.isBulkload()) {
+            throw new AlgebricksException("Tokenize Operator only works when bulk-loading data.");
+        }
+
+        IMetadataProvider mp = context.getMetadataProvider();
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+        JobSpecification spec = builder.getJobSpec();
+        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getTokenizerRuntime(
+                dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, null, inputDesc,
+                context, spec, true);
+        builder.contributeHyracksOperator(tokenizeOp, runtimeAndConstraints.first);
+        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        ILogicalOperator src = tokenizeOp.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, tokenizeOp, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 1ca6d26..a118ee8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -59,7 +60,8 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
+public class LogicalOperatorPrettyPrintVisitor implements
+        ILogicalOperatorVisitor<String, Integer> {
 
     ILogicalExpressionVisitor<String, Integer> exprVisitor;
 
@@ -67,35 +69,42 @@
         exprVisitor = new LogicalExpressionPrettyPrintVisitor();
     }
 
-    public LogicalOperatorPrettyPrintVisitor(ILogicalExpressionVisitor<String, Integer> exprVisitor) {
+    public LogicalOperatorPrettyPrintVisitor(
+            ILogicalExpressionVisitor<String, Integer> exprVisitor) {
         this.exprVisitor = exprVisitor;
     }
 
     @Override
-    public String visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
+    public String visitAggregateOperator(AggregateOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("aggregate ").append(op.getVariables()).append(" <- ");
+        addIndent(buffer, indent).append("aggregate ")
+                .append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
+    public String visitRunningAggregateOperator(RunningAggregateOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
+        addIndent(buffer, indent).append("running-aggregate ")
+                .append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Integer indent) {
+    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
+            Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("empty-tuple-source");
         return buffer.toString();
     }
 
     @Override
-    public String visitGroupByOperator(GroupByOperator op, Integer indent) throws AlgebricksException {
+    public String visitGroupByOperator(GroupByOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("group by (");
         pprintVeList(buffer, op.getGroupByList(), indent);
@@ -107,7 +116,8 @@
     }
 
     @Override
-    public String visitDistinctOperator(DistinctOperator op, Integer indent) throws AlgebricksException {
+    public String visitDistinctOperator(DistinctOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("distinct " + "(");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -116,62 +126,75 @@
     }
 
     @Override
-    public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
+    public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent))
-                .append(")");
+        addIndent(buffer, indent)
+                .append("join (")
+                .append(op.getCondition().getValue()
+                        .accept(exprVisitor, indent)).append(")");
         return buffer.toString();
     }
 
     @Override
-    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Integer indent) throws AlgebricksException {
+    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("left outer join (")
-                .append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
+        addIndent(buffer, indent)
+                .append("left outer join (")
+                .append(op.getCondition().getValue()
+                        .accept(exprVisitor, indent)).append(")");
         return buffer.toString();
     }
 
     @Override
-    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Integer indent) {
+    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+            Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("nested tuple source");
         return buffer.toString();
     }
 
     @Override
-    public String visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
+    public String visitOrderOperator(OrderOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("order ");
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op
+                .getOrderExpressions()) {
             String fst;
             switch (p.first.getKind()) {
-                case ASC: {
-                    fst = "ASC";
-                    break;
-                }
-                case DESC: {
-                    fst = "DESC";
-                    break;
-                }
-                default: {
-                    fst = p.first.getExpressionRef().toString();
-                }
+            case ASC: {
+                fst = "ASC";
+                break;
             }
-            buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+            case DESC: {
+                fst = "DESC";
+                break;
+            }
+            default: {
+                fst = p.first.getExpressionRef().toString();
+            }
+            }
+            buffer.append("(" + fst + ", "
+                    + p.second.getValue().accept(exprVisitor, indent) + ") ");
         }
         return buffer.toString();
     }
 
     @Override
-    public String visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
+    public String visitAssignOperator(AssignOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("assign ").append(op.getVariables()).append(" <- ");
+        addIndent(buffer, indent).append("assign ").append(op.getVariables())
+                .append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
+    public String visitWriteOperator(WriteOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("write ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -179,7 +202,8 @@
     }
 
     @Override
-    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
+    public String visitDistributeResultOperator(DistributeResultOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("distribute result ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -187,32 +211,42 @@
     }
 
     @Override
-    public String visitWriteResultOperator(WriteResultOperator op, Integer indent) throws AlgebricksException {
+    public String visitWriteResultOperator(WriteResultOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
-                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+        addIndent(buffer, indent)
+                .append("load ")
+                .append(op.getDataSource())
+                .append(" from ")
+                .append(op.getPayloadExpression().getValue()
+                        .accept(exprVisitor, indent))
+                .append(" partitioned by ");
         pprintExprList(op.getKeyExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitSelectOperator(SelectOperator op, Integer indent) throws AlgebricksException {
+    public String visitSelectOperator(SelectOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("select (").append(op.getCondition().getValue().accept(exprVisitor, indent))
-                .append(")");
+        addIndent(buffer, indent)
+                .append("select (")
+                .append(op.getCondition().getValue()
+                        .accept(exprVisitor, indent)).append(")");
         return buffer.toString();
     }
 
     @Override
     public String visitProjectOperator(ProjectOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("project " + "(" + op.getVariables() + ")");
+        addIndent(buffer, indent).append(
+                "project " + "(" + op.getVariables() + ")");
         return buffer.toString();
     }
 
     @Override
-    public String visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitPartitioningSplitOperator(PartitioningSplitOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("partitioning-split (");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -221,7 +255,8 @@
     }
 
     @Override
-    public String visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
+    public String visitSubplanOperator(SubplanOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("subplan {");
         printNestedPlans(op, indent, buffer);
@@ -232,44 +267,58 @@
     public String visitUnionOperator(UnionAllOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("union");
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
-            buffer.append(" (" + v.first + ", " + v.second + ", " + v.third + ")");
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op
+                .getVariableMappings()) {
+            buffer.append(" (" + v.first + ", " + v.second + ", " + v.third
+                    + ")");
         }
         return buffer.toString();
     }
 
     @Override
-    public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
+    public String visitUnnestOperator(UnnestOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("unnest " + op.getVariable());
         if (op.getPositionalVariable() != null) {
             buffer.append(" at " + op.getPositionalVariable());
         }
-        buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        buffer.append(" <- "
+                + op.getExpressionRef().getValue().accept(exprVisitor, indent));
         return buffer.toString();
     }
 
     @Override
-    public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
+    public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "unnest-map " + op.getVariables() + " <- "
-                        + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+                "unnest-map "
+                        + op.getVariables()
+                        + " <- "
+                        + op.getExpressionRef().getValue()
+                                .accept(exprVisitor, indent));
         return buffer.toString();
     }
 
     @Override
-    public String visitDataScanOperator(DataSourceScanOperator op, Integer indent) {
+    public String visitDataScanOperator(DataSourceScanOperator op,
+            Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
+                "data-scan " + op.getProjectVariables() + "<-"
+                        + op.getVariables() + " <- " + op.getDataSource());
         return buffer.toString();
     }
 
     @Override
-    public String visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
+    public String visitLimitOperator(LimitOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("limit " + op.getMaxObjects().getValue().accept(exprVisitor, indent));
+        addIndent(buffer, indent).append(
+                "limit "
+                        + op.getMaxObjects().getValue()
+                                .accept(exprVisitor, indent));
         ILogicalExpression offset = op.getOffset().getValue();
         if (offset != null) {
             buffer.append(", " + offset.accept(exprVisitor, indent));
@@ -288,62 +337,91 @@
     public String visitScriptOperator(ScriptOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
+                "script (in: " + op.getInputVariables() + ") (out: "
+                        + op.getOutputVariables() + ")");
         return buffer.toString();
     }
 
     @Override
-    public String visitReplicateOperator(ReplicateOperator op, Integer indent) throws AlgebricksException {
+    public String visitReplicateOperator(ReplicateOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("replicate ");
         return buffer.toString();
     }
 
     @Override
-    public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+    public String visitInsertDeleteOperator(InsertDeleteOperator op,
+            Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
-        addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
-                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+        String header = op.getOperation() == Kind.INSERT ? "insert into "
+                : "delete from ";
+        addIndent(buffer, indent)
+                .append(header)
+                .append(op.getDataSource())
+                .append(" from ")
+                .append(op.getPayloadExpression().getValue()
+                        .accept(exprVisitor, indent))
+                .append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+        if (op.isBulkload()) {
+            buffer.append(" [bulkload]");
+        }
         return buffer.toString();
     }
 
     @Override
-    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+            Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        String header = op.getOperation() == Kind.INSERT ? "insert into "
+                : "delete from ";
+        addIndent(buffer, indent).append(header).append(op.getIndexName())
+                .append(" on ").append(op.getDataSourceIndex().getDataSource())
+                .append(" from ");
+        pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+        if (op.isBulkload()) {
+            buffer.append(" [bulkload]");
+        }
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitTokenizeOperator(TokenizeOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
-        addIndent(buffer, indent).append(header).append(op.getDataSourceIndex()).append(" from ");
+        addIndent(buffer, indent).append("tokenize ")
+                .append(op.getTokenizeVars()).append(" <- ");
         pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
-        buffer.append(" ");
-        pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
+    public String visitSinkOperator(SinkOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("sink");
         return buffer.toString();
     }
 
     @Override
-    public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+    public String visitExtensionOperator(ExtensionOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(op.toString());
         return buffer.toString();
     }
 
-    protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
+    protected static final StringBuilder addIndent(StringBuilder buffer,
+            int level) {
         for (int i = 0; i < level; ++i) {
             buffer.append(' ');
         }
         return buffer;
     }
 
-    protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
-            throws AlgebricksException {
+    protected void printNestedPlans(AbstractOperatorWithNestedPlans op,
+            Integer indent, StringBuilder buffer) throws AlgebricksException {
         boolean first = true;
         if (op.getNestedPlans().isEmpty()) {
             buffer.append("}");
@@ -363,8 +441,9 @@
         }
     }
 
-    protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, StringBuilder buffer, Integer indent)
-            throws AlgebricksException {
+    protected void pprintExprList(
+            List<Mutable<ILogicalExpression>> expressions,
+            StringBuilder buffer, Integer indent) throws AlgebricksException {
         buffer.append("[");
         boolean first = true;
         for (Mutable<ILogicalExpression> exprRef : expressions) {
@@ -378,7 +457,9 @@
         buffer.append("]");
     }
 
-    protected void pprintVeList(StringBuilder sb, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
+    protected void pprintVeList(
+            StringBuilder sb,
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
             Integer indent) throws AlgebricksException {
         sb.append("[");
         boolean fst = true;
@@ -398,11 +479,13 @@
     }
 
     @Override
-    public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
+    public String visitExternalDataLookupOperator(
+            ExternalDataLookupOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+                "external-instant-lookup " + op.getVariables() + " <- "
+                        + op.getExpressionRef().getValue());
         return buffer.toString();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 3fdee94..c753042 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -105,7 +106,9 @@
     public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
 
     public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
-    
+
     public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
 
+    public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 1fe6b15..e1f897c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 4b965f5..099520d 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -174,11 +174,15 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+            IDataSourceIndex<String, String> dataSource,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr,
+            RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -195,6 +199,16 @@
     }
 
     @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+            IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return FN_MAP.get(fid);
     }
@@ -203,7 +217,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+            JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 58cb2ef..bc7255b 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -47,14 +47,17 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
@@ -71,6 +74,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StringStreamingScriptPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SubplanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
@@ -288,8 +292,13 @@
                         additionalFilteringKeys = new ArrayList<LogicalVariable>();
                         getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
-                    op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
-                            .getDataSource()));
+                    if (opLoad.isBulkload()) {
+                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
+                                .getDataSource()));
+                    } else {
+                        op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
+                                .getDataSource()));
+                    }
                     break;
                 }
                 case INDEX_INSERT_DELETE: {
@@ -303,8 +312,28 @@
                         additionalFilteringKeys = new ArrayList<LogicalVariable>();
                         getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                     }
-                    op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
-                            additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                    if (opInsDel.isBulkload()) {
+                        op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
+                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                    } else {
+                        op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
+                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+                    }
+
+                    break;
+
+                }
+                case TOKENIZE: {
+                    TokenizeOperator opTokenize = (TokenizeOperator) op;
+                    List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
+                    List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
+                    getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
+                    getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
+                    // Tokenize Operator only operates with a bulk load on a data set with an index
+                    if (opTokenize.isBulkload()) {
+                        op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
+                                .getDataSourceIndex()));
+                    }
                     break;
                 }
                 case SINK: {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
deleted file mode 100644
index 62b7c96..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.context;
-
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-
-public class AsterixBTreeRegistry {
-
-    private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
-    private Lock registryLock = new ReentrantLock();
-
-    public BTree get(int fileId) {
-        return map.get(fileId);
-    }
-
-    // TODO: not very high concurrency, but good enough for now
-    public void lock() {
-        registryLock.lock();
-    }
-
-    public void unlock() {
-        registryLock.unlock();
-    }
-
-    public void register(int fileId, BTree btree) {
-        map.put(fileId, btree);
-    }
-
-    public void unregister(int fileId) {
-        map.remove(fileId);
-    }
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index aaa6862..81052d6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -33,13 +33,17 @@
 
     @Override
     public void close() throws HyracksDataException {
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
+    }
+
+    protected void flushIfNotFailed() throws HyracksDataException {
         if (!failed) {
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(frame, writer);
             }
         }
-        writer.close();
-        appender.reset(frame, true);
     }
 
     protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 3e87f31..c0ae029 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,12 +71,13 @@
                     }
                 }
                 if (flushFramesRapidly) {
-                    // Whenever all the tuples in the incoming frame have been consumed, the project operator 
-                    // will push its frame to the next operator; i.e., it won't wait until the frame gets full. 
+                    // Whenever all the tuples in the incoming frame have been consumed, the project operator
+                    // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
                     appendProjectionToFrame(t, projectionList, true);
                 } else {
                     appendProjectionToFrame(t, projectionList);
                 }
+
             }
 
         };
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 8c72a3e..42356a5 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -95,7 +95,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+            JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -135,7 +135,7 @@
             IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec) throws AlgebricksException {
+            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -149,4 +149,14 @@
         // TODO Auto-generated method stub
         return null;
     }
+
+  @Override
+  public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+          IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+          IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+          ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+          JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
+      // TODO Auto-generated method stub
+      return null;
+  }
 }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index ac44c11..6edd647 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,7 @@
  * the offset of the (i + 1)^th tuple. Every tuple is organized as a sequence of
  * ints indicating the end of each field in the tuple relative to the end of the
  * field slots.
- * 
+ *
  * @author vinayakb
  */
 public final class FrameTupleAccessor implements IFrameTupleAccessor {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
index ad0cf93..acacd6a 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index d3d4833..ba8cfd6 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -82,6 +82,7 @@
                     throw new HyracksDataException(e);
                 }
             }
+
         };
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
new file mode 100644
index 0000000..fa0d874
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SinkOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInput) {
+        super(spec, nInput, 0);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new AbstractUnaryInputSinkOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 1c1901d..3ea0ad9 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.examples.btree.helper.DataGenOperatorDescriptor;
 import edu.uci.ics.hyracks.examples.btree.helper.IndexLifecycleManagerProvider;
@@ -46,12 +47,11 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 
 // This example will load a primary index from randomly generated data
 
-public class PrimaryIndexBulkLoadExample {   
+public class PrimaryIndexBulkLoadExample {
     private static class Options {
         @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
         public String host;
@@ -70,7 +70,7 @@
 
         @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
         public int sbSize = 32768;
-        
+
         @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
         public int frameSize = 32768;
     }
@@ -150,10 +150,10 @@
                                                  // etc.
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
         IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
                 storageManager, lcManagerProvider, btreeSplitProvider, typeTraits, comparatorFactories, null,
-                fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
+
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
 
         // distribute the records from the datagen via hashing to the bulk load
@@ -162,12 +162,13 @@
         hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
         IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
 
         spec.connect(hashConn, dataGen, 0, sorter, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-
-        spec.addRoot(btreeBulkLoad);
+        spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
+        spec.addRoot(nsOpDesc);
 
         return spec;
     }
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 4826808..62d0487 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.examples.btree.helper.IndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
@@ -65,7 +66,7 @@
 
         @Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
         public int sbSize = 32768;
-        
+
         @Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
         public int frameSize = 32768;
     }
@@ -142,19 +143,18 @@
         // tuple
         int[] fieldPermutation = { 1, 0 };
         IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
-        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
                 storageManager, lcManagerProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories, null,
-                fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
         JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
 
         // connect the ops
-
         spec.connect(new OneToOneConnectorDescriptor(spec), btreeScanOp, 0, sorter, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-
-        spec.addRoot(btreeBulkLoad);
+        spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
+        spec.addRoot(nsOpDesc);
 
         return spec;
     }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 19b749a..1eb0ee9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -177,16 +177,19 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                primaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true, dataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                primaryRecDesc, storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, primaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true,
+                dataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
-
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeBulkLoad, 0, nsOpDesc, 0);
 
-        spec.addRoot(primaryBtreeBulkLoad);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
@@ -240,16 +243,20 @@
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryRecDesc, storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
                 secondaryComparatorFactories, secondaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true,
-                dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                dataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, sorter, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, secondaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeBulkLoad, 0, nsOpDesc, 0);
 
-        spec.addRoot(secondaryBtreeBulkLoad);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index cfe6db4..2aa037d 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -208,9 +209,8 @@
     private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
         int[] fieldPermutation = { 0, 1 };
         TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, null, fieldPermutation, 0.7f, true, 1000L, true,
-                btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                primaryRecDesc, storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, null, fieldPermutation, 0.7f, true, 1000L, true, btreeDataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
         return primaryBtreeBulkLoad;
     }
@@ -249,8 +249,10 @@
         // before bulk load.
         IOperatorDescriptor fileScanOp = createFileScanOp(spec);
         IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
         spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
-        spec.addRoot(primaryBulkLoad);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBulkLoad, 0, nsOpDesc, 0);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
@@ -277,16 +279,16 @@
 
     private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int docField, int[] keyFields) {
         BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey());
+                tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey(), false);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
         return binaryTokenizer;
     }
 
     private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
         LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, fieldPermutation, true, 1000L, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
-                tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                spec, tokenizerRecDesc, fieldPermutation, true, 1000L, true, storageManager, btreeFileSplitProvider,
+                lcManagerProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits,
+                invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
         return invIndexBulkLoadOp;
     }
@@ -306,11 +308,14 @@
         }
         IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
         IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
-        spec.addRoot(invIndexBulkLoadOp);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, nsOpDesc, 0);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
index ba00691..268c3cb 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
@@ -85,7 +85,7 @@
                 tokenFactory);
         int[] keyFields = { 0 };
         BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey);
+                tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey, false);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 2bdf652..8a335b5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -241,16 +241,21 @@
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
         TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                null, fieldPermutation, 0.7f, false, 1000L, true, btreeDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
+                primaryRecDesc, storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, null, fieldPermutation, 0.7f, false, 1000L, true,
+                btreeDataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBulkLoad, 0);
 
-        spec.addRoot(primaryBulkLoad);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBulkLoad, 0, nsOpDesc, 0);
+
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
@@ -298,15 +303,19 @@
         // load secondary index
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
+                secondaryRecDesc, storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
                 secondaryComparatorFactories, null, fieldPermutation, 0.7f, false, 1000L, true,
-                rtreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+                rtreeDataflowHelperFactory);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, secondaryBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoad, 0, nsOpDesc, 0);
 
-        spec.addRoot(secondaryBulkLoad);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }
 
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
index 45b24c5..e46e685 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 81c13a1..9f3c5dd 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,14 +21,16 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
-public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+public class IndexBulkLoadOperatorNodePushable extends
+        AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
     protected final float fillFactor;
@@ -42,46 +44,57 @@
     protected IRecordDescriptorProvider recDescProvider;
     protected PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
-    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
-            boolean checkIfEmptyIndex, IRecordDescriptorProvider recordDescProvider) {
+    public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc,
+            IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
+            float fillFactor, boolean verifyInput, long numElementsHint,
+            boolean checkIfEmptyIndex,
+            IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         this.ctx = ctx;
-        this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+        this.indexHelper = opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
         this.checkIfEmptyIndex = checkIfEmptyIndex;
         this.recDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
+
     }
 
     @Override
     public void open() throws HyracksDataException {
-        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
+                opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
-            bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+            bulkLoader = index.createBulkLoader(fillFactor, verifyInput,
+                    numElementsHint, checkIfEmptyIndex);
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
         }
+        writer.open();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
+
         for (int i = 0; i < tupleCount; i++) {
             tuple.reset(accessor, i);
+
             try {
                 bulkLoader.add(tuple);
             } catch (IndexException e) {
                 throw new HyracksDataException(e);
             }
         }
+        FrameUtils.flushFrame(buffer, writer);
+
     }
 
     @Override
@@ -93,6 +106,7 @@
         } finally {
             indexHelper.close();
         }
+        writer.close();
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index e8db013..92e178a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -38,17 +38,16 @@
     private final long numElementsHint;
     private final boolean checkIfEmptyIndex;
 
-    public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec,
+    public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
             float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
-            IIndexDataflowHelperFactory dataflowHelperFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
-        super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
-                false, null,
-                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+            IIndexDataflowHelperFactory dataflowHelperFactory) {
+        super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false, false, null,
+                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.fieldPermutation = fieldPermutation;
         this.fillFactor = fillFactor;
         this.verifyInput = verifyInput;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index 71e2b9f..d23d484 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,10 +37,10 @@
     protected static final int nextPageOff = levelOff + 1; // 21
     protected static final int validOff = nextPageOff + 4; // 25
 
-    // The additionalFilteringPageOff is used only for LSM indexes. 
+    // The additionalFilteringPageOff is used only for LSM indexes.
     // We store the page id that will be used to store the information of the the filter that is associated with a disk component.
     // It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field.
-    private static final int additionalFilteringPageOff = validOff + 4; // 29 
+    private static final int additionalFilteringPageOff = validOff + 4; // 29
     protected static final int lsnOff = additionalFilteringPageOff + 4; // 33
 
     protected ICachedPage page = null;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
index 5a60b09..c33eaab 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
index e2b5584..85c293d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,7 @@
 
     // This page consists of two tuples that represents the minimum and maximum tuples in an LSM component.
 
-    // A-one byte to indicate whether the filter tuples were set yet. 
+    // A-one byte to indicate whether the filter tuples were set yet.
     private static final int minTupleIsSetIndicatorOff = 0;
     private static final int maxTupleIsSetIndicatorOff = 1;
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
index e2c31aa..682338c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,7 @@
         this.filter = filter;
         readerCount = 0;
     }
-    
+
     public AbstractLSMComponent() {
         this(null);
     }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
index bd54769..7ce8907 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +54,7 @@
         int fileId = treeIndex.getFileId();
         ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
 
-        // Read the filter page from the first metadata page of the tree. 
+        // Read the filter page from the first metadata page of the tree.
         // If it is has not been created yet, then create a new one.
         int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
         ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 6ccfed3..648c51a 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,15 +38,20 @@
     // Indicates whether the first key field should be the number of tokens in the tokenized set of the document.
     // This value is used in partitioned inverted indexes, for example.
     private final boolean addNumTokensKey;
+    // Indicates the order of field write
+    // True: [keyfield1, ... n , token, number of token (if a partitioned index)]
+    // False: [token, number of token(if a partitioned index), keyfield1, keyfield2 ...]
+    private final boolean writeKeyFieldsFirst;
 
     public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey) {
+            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
         super(spec, 1, 1);
         this.tokenizerFactory = tokenizerFactory;
         this.docField = docField;
         this.keyFields = keyFields;
         this.addNumTokensKey = addNumTokensKey;
         recordDescriptors[0] = recDesc;
+        this.writeKeyFieldsFirst = writeKeyFieldsFirst;
     }
 
     @Override
@@ -54,6 +59,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
                 getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), docField, keyFields,
-                addNumTokensKey);
+                addNumTokensKey, writeKeyFieldsFirst);
     }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index ad2022d..8598b70 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,11 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -25,18 +27,22 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class BinaryTokenizerOperatorNodePushable extends
+        AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private final IHyracksTaskContext ctx;
     private final IBinaryTokenizer tokenizer;
     private final int docField;
     private final int[] keyFields;
     private final boolean addNumTokensKey;
+    private final boolean writeKeyFieldsFirst;
     private final RecordDescriptor inputRecDesc;
     private final RecordDescriptor outputRecDesc;
 
@@ -46,9 +52,10 @@
     private FrameTupleAppender appender;
     private ByteBuffer writeBuffer;
 
-    public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
-            RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
-            boolean addNumTokensKey) {
+    public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx,
+            RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc,
+            IBinaryTokenizer tokenizer, int docField, int[] keyFields,
+            boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
         this.ctx = ctx;
         this.tokenizer = tokenizer;
         this.docField = docField;
@@ -56,6 +63,7 @@
         this.addNumTokensKey = addNumTokensKey;
         this.inputRecDesc = inputRecDesc;
         this.outputRecDesc = outputRecDesc;
+        this.writeKeyFieldsFirst = writeKeyFieldsFirst;
     }
 
     @Override
@@ -73,14 +81,17 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
+
         for (int i = 0; i < tupleCount; i++) {
             short numTokens = 0;
             if (addNumTokensKey) {
                 // Run through the tokens to get the total number of tokens.
                 tokenizer.reset(
                         accessor.getBuffer().array(),
-                        accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+                        accessor.getTupleStartOffset(i)
+                                + accessor.getFieldSlotsLength()
+                                + accessor.getFieldStartOffset(i, docField),
+                        accessor.getFieldLength(i, docField));
                 while (tokenizer.hasNext()) {
                     tokenizer.next();
                     numTokens++;
@@ -89,38 +100,81 @@
 
             tokenizer.reset(
                     accessor.getBuffer().array(),
-                    accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
-                            + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+                    accessor.getTupleStartOffset(i)
+                            + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(i, docField),
+                    accessor.getFieldLength(i, docField));
+
+            // Write token and data into frame by following the order specified
+            // in the writeKeyFieldsFirst field.
             while (tokenizer.hasNext()) {
+
                 tokenizer.next();
 
                 builder.reset();
-                try {
-                    IToken token = tokenizer.getToken();
-                    token.serializeToken(builderData);
-                    builder.addFieldEndOffset();
-                    // Add number of tokens if requested.
-                    if (addNumTokensKey) {
-                        builder.getDataOutput().writeShort(numTokens);
+
+                // Writing Order: token, number of token, keyfield1 ... n
+                if (!writeKeyFieldsFirst) {
+                    try {
+                        IToken token = tokenizer.getToken();
+                        token.serializeToken(builderData);
+
                         builder.addFieldEndOffset();
+                        // Add number of tokens if requested.
+                        if (addNumTokensKey) {
+                            builder.getDataOutput().writeShort(numTokens);
+                            builder.addFieldEndOffset();
+                        }
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e.getMessage());
                     }
-                } catch (IOException e) {
-                    throw new HyracksDataException(e.getMessage());
+
+                    for (int k = 0; k < keyFields.length; k++) {
+                        builder.addField(accessor, i, keyFields[k]);
+                    }
+
+                }
+                // Writing Order: keyfield1 ... n, token, number of token
+                else {
+
+                    for (int k = 0; k < keyFields.length; k++) {
+                        builder.addField(accessor, i, keyFields[k]);
+                    }
+
+                    try {
+                        IToken token = tokenizer.getToken();
+                        token.serializeToken(builderData);
+
+                        builder.addFieldEndOffset();
+                        // Add number of tokens if requested.
+                        if (addNumTokensKey) {
+                            builder.getDataOutput().writeShort(numTokens);
+                            builder.addFieldEndOffset();
+                        }
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e.getMessage());
+                    }
+
                 }
 
-                for (int k = 0; k < keyFields.length; k++) {
-                    builder.addField(accessor, i, keyFields[k]);
-                }
-
-                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                if (!appender.append(builder.getFieldEndOffsets(),
+                        builder.getByteArray(), 0, builder.getSize())) {
                     FrameUtils.flushFrame(writeBuffer, writer);
                     appender.reset(writeBuffer, true);
-                    if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
-                        throw new HyracksDataException("Record size (" + builder.getSize() +") larger than frame size (" + appender.getBuffer().capacity() + ")");
+
+                    if (!appender.append(builder.getFieldEndOffsets(),
+                            builder.getByteArray(), 0, builder.getSize())) {
+                        throw new HyracksDataException("Record size ("
+                                + builder.getSize()
+                                + ") larger than frame size ("
+                                + appender.getBuffer().capacity() + ")");
                     }
                 }
+
             }
+
         }
+
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
index 1f3ed76..10ad7fe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -40,18 +40,18 @@
     private final long numElementsHint;
     private final boolean checkIfEmptyIndex;
 
-    public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] fieldPermutation,
-            boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+    public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+            int[] fieldPermutation, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
             IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
             IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
             IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
-            IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
-        super(spec, 1, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+            IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory) {
+        super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
-                invertedIndexDataflowHelperFactory, null, false, false,
-                null, NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+                invertedIndexDataflowHelperFactory, null, false, false, null,
+                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.fieldPermutation = fieldPermutation;
         this.verifyInput = verifyInput;
         this.numElementsHint = numElementsHint;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0c96324..462df4d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -70,6 +70,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
@@ -575,17 +576,21 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, BF_HINT, false,
-                getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, BF_HINT, false,
+                getIndexDataflowHelperFactory());
         setLocationConstraint(spec, btreeBulkLoad);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        setLocationConstraint(spec, nsOpDesc);
+
         /**
          * connect operator descriptors
          */
         ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
         spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
         spec.setFrameSize(frameSize);
         return spec;
     }
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 5478ed9..c600c93 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
@@ -280,18 +281,20 @@
         ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
-        TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
+        TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec, custDesc,
                 storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 100000, false, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackFactory.INSTANCE);
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 100000, false, new BTreeDataflowHelperFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
 
+        NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID, NC2_ID);
+
         spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
                 sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
                 nmkFactory), sorter, 0, writer, 0);
-
-        spec.addRoot(writer);
+        spec.connect(new OneToOneConnectorDescriptor(spec), writer, 0, nsOpDesc, 0);
+        spec.addRoot(nsOpDesc);
         runTest(spec);
     }