ASTERIXDB-1238: Refactor AqlMetadataProvider

Change-Id: If2720817c5659622e1f713653856825d612eb892
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1016
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 72360b6..0975163 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -68,17 +68,14 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
-import org.apache.asterix.metadata.entities.Datatype;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
@@ -123,11 +120,9 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.core.jobgen.impl.OperatorSchemaImpl;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
@@ -174,23 +169,27 @@
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
 
+    private final AsterixStorageProperties storageProperties;
+    private final ILibraryManager libraryManager;
+    private final Dataverse defaultDataverse;
+
     private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
-    private final Map<String, String[]> stores;
     private Map<String, String> config;
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
     private boolean asyncResults;
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
-
-    private final Dataverse defaultDataverse;
     private JobId jobId;
     private Map<String, Integer> locks;
     private boolean isTemporaryDatasetWriteJob = true;
 
-    private final AsterixStorageProperties storageProperties;
-    private final ILibraryManager libraryManager;
+    public AqlMetadataProvider(Dataverse defaultDataverse) {
+        this.defaultDataverse = defaultDataverse;
+        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+    }
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -200,21 +199,10 @@
         this.config = config;
     }
 
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
     public Map<String, String> getConfig() {
         return config;
     }
 
-    public AqlMetadataProvider(Dataverse defaultDataverse) {
-        this.defaultDataverse = defaultDataverse;
-        this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
-    }
-
     public ILibraryManager getLibraryManager() {
         return libraryManager;
     }
@@ -283,35 +271,6 @@
         return resultSerializerFactoryProvider;
     }
 
-    /**
-     * Retrieve the Output RecordType, as defined by "set output-record-type".
-     */
-    public ARecordType findOutputRecordType() throws AlgebricksException {
-        String outputRecordType = getPropertyValue("output-record-type");
-        if (outputRecordType == null) {
-            return null;
-        }
-        String dataverse = getDefaultDataverseName();
-        if (dataverse == null) {
-            throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
-        }
-        IAType type = findType(dataverse, outputRecordType);
-        if (!(type instanceof ARecordType)) {
-            throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
-        }
-        return (ARecordType) type;
-    }
-
-    @Override
-    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
-        AqlSourceId aqlId = id;
-        try {
-            return lookupSourceInMetadata(aqlId);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
     public boolean isWriteTransaction() {
         // The transaction writes persistent datasets.
         return isWriteTransaction;
@@ -322,6 +281,98 @@
         return isTemporaryDatasetWriteJob;
     }
 
+    public IDataFormat getFormat() {
+        return FormatUtils.getDefaultFormat();
+    }
+
+    public AsterixStorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    public Map<String, Integer> getLocks() {
+        return locks;
+    }
+
+    public void setLocks(Map<String, Integer> locks) {
+        this.locks = locks;
+    }
+
+    /**
+     * Retrieve the Output RecordType, as defined by "set output-record-type".
+     */
+    public ARecordType findOutputRecordType() throws AlgebricksException {
+        return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
+                getPropertyValue("output-record-type"));
+    }
+
+    public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
+        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
+                : dataverse;
+        if (dv == null) {
+            return null;
+        }
+        return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
+    }
+
+    public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
+        return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+    }
+
+    public IAType findType(String dataverse, String typeName) throws AlgebricksException {
+        return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
+    }
+
+    public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
+        return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
+    }
+
+    public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverse, policyName);
+    }
+
+    @Override
+    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
+        return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+    }
+
+    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException {
+        return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+    }
+
+    @Override
+    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+            throws AlgebricksException {
+        AqlDataSource ads = findDataSource(dataSourceId);
+        Dataset dataset = ((DatasetDataSource) ads).getDataset();
+        try {
+            String indexName = indexId;
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+            if (secondaryIndex != null) {
+                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+            } else {
+                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                        dataset.getDatasetName(), dataset.getDatasetName());
+                if (primaryIndex.getIndexName().equals(indexId)) {
+                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
+                } else {
+                    return null;
+                }
+            }
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
+        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+    }
+
+    @Override
+    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+        return AsterixBuiltinFunctions.lookupFunction(fid);
+    }
+
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
             IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
@@ -337,8 +388,7 @@
         }
     }
 
-    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource)
-            throws AsterixException {
+    public static AlgebricksAbsolutePartitionConstraint determineLocationConstraint(FeedDataSource feedDataSource) {
         return new AlgebricksAbsolutePartitionConstraint(feedDataSource.getLocations());
     }
 
@@ -367,64 +417,9 @@
         return format;
     }
 
-    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
-            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
-            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
-        try {
-            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
-            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
-                    configuration, itemType, metaType);
-
-            // check to see if dataset is indexed
-            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(),
-                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
-
-            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
-                // get files
-                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
-                Iterator<ExternalFile> iterator = files.iterator();
-                while (iterator.hasNext()) {
-                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
-                        iterator.remove();
-                    }
-                }
-                // TODO Check this call, result of merge from master!
-                // ((IGenericAdapterFactory) adapterFactory).setFiles(files);
-            }
-
-            return adapterFactory;
-        } catch (Exception e) {
-            throw new AlgebricksException("Unable to create adapter", e);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
-            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
-            throws AlgebricksException {
-        if (itemType.getTypeTag() != ATypeTag.RECORD) {
-            throw new AlgebricksException("Can only scan datasets of records.");
-        }
-
-        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
-        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
-                adapterFactory);
-
-        AlgebricksPartitionConstraint constraint;
-        try {
-            constraint = adapterFactory.getPartitionConstraint();
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
-    }
-
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
             JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception {
-        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput = null;
+        Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
         factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx,
                 libraryManager);
         ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(),
@@ -445,8 +440,7 @@
         }
 
         AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
-        return new Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory>(feedIngestor,
-                partitionConstraint, adapterFactory);
+        return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
@@ -454,7 +448,6 @@
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
@@ -496,10 +489,10 @@
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
                         getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                                secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
-                                secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
-                                dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
-                                secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+                        secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+                        DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+                        dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
+                        metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -519,22 +512,18 @@
                 // get meta item type
                 ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
                 typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
-                        metaItemType, context.getBinaryComparatorFactoryProvider());
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType,
+                        context.getBinaryComparatorFactoryProvider());
                 filterFields = DatasetUtils.createFilterFields(dataset);
                 btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            try {
-                spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
-                        dataset.getDatasetName(), indexName, temp);
-            } catch (Exception e) {
-                throw new AlgebricksException(e);
-            }
+            spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(),
+                    indexName, temp);
 
-            ISearchOperationCallbackFactory searchCallbackFactory = null;
+            ISearchOperationCallbackFactory searchCallbackFactory;
             if (isSecondary) {
                 searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
                         : new SecondaryIndexSearchOperationCallbackFactory();
@@ -580,83 +569,29 @@
                 int[] buddyBreeFields = new int[] { numSecondaryKeys };
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
                         new ExternalBTreeWithBuddyDataflowHelperFactory(
-                                compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                        compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
                         retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
             }
-
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
-
+            return new Pair<>(btreeSearchOp, spPc.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-            IndexType indexType, List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes,
-            List<List<String>> pidxKeyFieldNames, ARecordType recType, DatasetType dsType, boolean hasMeta,
-            List<Integer> primaryIndexKeyIndicators, List<Integer> secondaryIndexIndicators, ARecordType metaType)
-            throws AlgebricksException {
-
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        int sidxKeyFieldCount = sidxKeyFieldNames.size();
-        int pidxKeyFieldCount = pidxKeyFieldNames.size();
-        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
-        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
-
-        int i = 0;
-        for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
-                    sidxKeyFieldNames.get(i),
-                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
-            IAType keyType = keyPairType.first;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
-            IAType keyType = null;
-            try {
-                switch (dsType) {
-                    case INTERNAL:
-                        keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
-                                ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
-                                : recType.getSubFieldType(pidxKeyFieldNames.get(j));
-                        break;
-                    case EXTERNAL:
-                        keyType = IndexingConstants.getFieldType(j);
-                        break;
-                    default:
-                        throw new AlgebricksException("Unknown Dataset Type");
-                }
-            } catch (AsterixException e) {
-                throw new AlgebricksException(e);
-            }
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                    true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-        }
-
-        return new Pair<IBinaryComparatorFactory[], ITypeTraits[]>(comparatorFactories, typeTraits);
-    }
-
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
-
         try {
-            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
+            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
@@ -670,9 +605,9 @@
             List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
             int numSecondaryKeys = secondaryKeyFields.size();
             if (numSecondaryKeys != 1) {
-                throw new AlgebricksException("Cannot use " + numSecondaryKeys
-                        + " fields as a key for the R-tree index. "
-                        + "There can be only one field as a key for the R-tree index.");
+                throw new AlgebricksException(
+                        "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. "
+                                + "There can be only one field as a key for the R-tree index.");
             }
             Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
                     secondaryKeyFields.get(0), recType);
@@ -701,8 +636,8 @@
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName, temp);
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
                 metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -770,28 +705,12 @@
                         retainMissing, context.getMissingWriterFactory(), searchCallbackFactory);
             }
 
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
-
+            return new Pair<>(rtreeSearchOp, spPc.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
-            IBinaryComparatorFactory[] primaryComparatorFactories) {
-        IBinaryComparatorFactory[] btreeCompFactories = null;
-        int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
-        btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
-        int i = 0;
-        for (; i < comparatorFactories.length; i++) {
-            btreeCompFactories[i] = comparatorFactories[i];
-        }
-        for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
-            btreeCompFactories[i] = primaryComparatorFactories[j];
-        }
-        return btreeCompFactories;
-    }
-
     @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
@@ -804,7 +723,7 @@
         SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
                 getWriterFactory(), inputDesc);
         AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
-        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+        return new Pair<>(runtime, apc);
     }
 
     @Override
@@ -814,7 +733,6 @@
         ResultSetDataSink rsds = (ResultSetDataSink) sink;
         ResultSetSinkId rssId = rsds.getId();
         ResultSetId rsId = rssId.getResultSetId();
-
         ResultWriterOperatorDescriptor resultWriter = null;
         try {
             IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
@@ -824,48 +742,7 @@
         } catch (IOException e) {
             throw new AlgebricksException(e);
         }
-
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, null);
-    }
-
-    @Override
-    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
-            throws AlgebricksException {
-        AqlDataSource ads = findDataSource(dataSourceId);
-        Dataset dataset = ((DatasetDataSource) ads).getDataset();
-
-        try {
-            String indexName = indexId;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex != null) {
-                return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-            } else {
-                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), dataset.getDatasetName());
-                if (primaryIndex.getIndexName().equals(indexId)) {
-                    return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-                } else {
-                    return null;
-                }
-            }
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
-        Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasourceName());
-        if (dataset == null) {
-            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
-        }
-        IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-        IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
-        INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
-        byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
-                ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
-        return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
-                dataset.getDatasetDetails(), domain);
+        return new Pair<>(resultWriter, null);
     }
 
     @Override
@@ -876,18 +753,13 @@
         String dataverseName = dataSource.getId().getDataverseName();
         String datasetName = dataSource.getId().getDatasourceName();
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         int numKeys = keys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
 
         // move key fields to front
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
         int[] bloomFilterKeyFields = new int[numKeys];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
         int i = 0;
         for (LogicalVariable varKey : keys) {
             int idx = propagatedSchema.findVariable(varKey);
@@ -923,8 +795,8 @@
                     itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(),
+                            datasetName, indexName, temp);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -953,114 +825,7 @@
                             LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
                             filterCmpFactories, btreeFields, filterFields, !temp));
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
-                    splitsAndConstraint.second);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
-            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
-            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
-
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException(
-                    "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-        // Move key fields to front.
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
-                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[i++] = idx;
-        }
-        if (additionalNonFilteringFields != null) {
-            for (LogicalVariable variable : additionalNonFilteringFields) {
-                int idx = propagatedSchema.findVariable(variable);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = DatasetUtils.createFilterFields(dataset);
-            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
-
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
-                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-
+            return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
@@ -1072,7 +837,7 @@
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification spec, boolean bulkload) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
+        return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
                 additionalNonKeyFields, recordDesc, context, spec, bulkload, additionalNonFilteringFields);
     }
 
@@ -1081,58 +846,10 @@
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
             List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, typeEnv, keys, payload,
+        return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
                 additionalNonKeyFields, recordDesc, context, spec, false, null);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(
-            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-            JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        Index secondaryIndex;
-        try {
-            secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        switch (secondaryIndex.getIndexType()) {
-            case BTREE: {
-                return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload);
-            }
-            case RTREE: {
-                return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
-                        bulkload);
-            }
-            case SINGLE_PARTITION_WORD_INVIX:
-            case SINGLE_PARTITION_NGRAM_INVIX:
-            case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                return getInvertedIndexDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
-                        primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
-                        indexOp, secondaryIndex.getIndexType(), bulkload);
-            }
-            default: {
-                throw new AlgebricksException(
-                        "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
-            }
-        }
-    }
-
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
             IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
@@ -1140,9 +857,34 @@
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
             boolean bulkload) throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
-                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
-                bulkload);
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, bulkload, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
+                context, spec, false, null, null);
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
+                inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
+                context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
     }
 
     @Override
@@ -1156,17 +898,14 @@
         String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
-        IOperatorSchema inputSchema = new OperatorSchemaImpl();
+        IOperatorSchema inputSchema;
         if (inputSchemas.length > 0) {
             inputSchema = inputSchemas[0];
         } else {
             throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
         }
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         Index secondaryIndex;
         try {
             secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1174,750 +913,18 @@
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         // TokenizeOperator only supports a keyword or n-gram index.
         switch (secondaryIndex.getIndexType()) {
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
                 return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
-                        typeEnv, primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-                        IndexOperation.INSERT, secondaryIndex.getIndexType(), bulkload);
-            }
-            default: {
+                        primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
+            default:
                 throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
                         + secondaryIndex.getIndexType());
-            }
         }
-
-    }
-
-    // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
-            throws AlgebricksException {
-
-        // Sanity checks.
-        if (primaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot tokenize composite primary key.");
-        }
-        if (secondaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
-        }
-
-        boolean isPartitioned;
-        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-            isPartitioned = true;
-        } else {
-            isPartitioned = false;
-        }
-
-        // Number of Keys that needs to be propagated
-        int numKeys = inputSchema.getSize();
-
-        // Get the rest of Logical Variables that are not (PK or SK) and each
-        // variable's positions.
-        // These variables will be propagated through TokenizeOperator.
-        List<LogicalVariable> otherKeys = new ArrayList<LogicalVariable>();
-        if (inputSchema.getSize() > 0) {
-            for (int k = 0; k < inputSchema.getSize(); k++) {
-                boolean found = false;
-                for (LogicalVariable varKey : primaryKeys) {
-                    if (varKey.equals(inputSchema.getVariable(k))) {
-                        found = true;
-                        break;
-                    } else {
-                        found = false;
-                    }
-                }
-                if (!found) {
-                    for (LogicalVariable varKey : secondaryKeys) {
-                        if (varKey.equals(inputSchema.getVariable(k))) {
-                            found = true;
-                            break;
-                        } else {
-                            found = false;
-                        }
-                    }
-                }
-                if (!found) {
-                    otherKeys.add(inputSchema.getVariable(k));
-                }
-            }
-        }
-
-        // For tokenization, sorting and loading.
-        // One token (+ optional partitioning field) + primary keys + secondary
-        // keys + other variables
-        // secondary keys and other variables will be just passed to the
-        // IndexInsertDelete Operator.
-        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
-
-        // generate field permutations for the input
-        int[] fieldPermutation = new int[numKeys];
-
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-        int i = 0;
-        int j = 0;
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            modificationCallbackPrimaryKeyFields[j] = i;
-            i++;
-            j++;
-        }
-        for (LogicalVariable varKey : otherKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be tokenized.");
-            }
-
-            ARecordType recType = (ARecordType) itemType;
-
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-
-            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
-
-            int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
-            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
-            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
-
-            // Find the key type of the secondary key. If it's a derived type,
-            // return the derived type.
-            // e.g. UNORDERED LIST -> return UNORDERED LIST type
-            IAType secondaryKeyType = null;
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
-                    secondaryKeyExprs.get(0), recType);
-            secondaryKeyType = keyPairType.first;
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            i = 0;
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
-            if (isPartitioned) {
-                // The partitioning field is hardcoded to be a short *without*
-                // an Asterix type tag.
-                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
-            }
-
-            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
-                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
-
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
-
-            // Generate Output Record format
-            ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
-            ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
-            ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
-
-            // The order of the output record: propagated variables (including
-            // PK and SK), token, and number of token.
-            // #1. propagate all input variables
-            for (int k = 0; k < recordDesc.getFieldCount(); k++) {
-                tokenKeyPairFields[k] = recordDesc.getFields()[k];
-                tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
-            }
-            int tokenOffset = recordDesc.getFieldCount();
-
-            // #2. Specify the token type
-            tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
-            tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
-            tokenOffset++;
-
-            // #3. Specify the length-partitioning key: number of token
-            if (isPartitioned) {
-                tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
-                tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
-            }
-
-            RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
-            IOperatorDescriptor tokenizerOp;
-
-            // Keys to be tokenized : SK
-            int docField = fieldPermutation[fieldPermutation.length - 1];
-
-            // Keys to be propagated
-            int[] keyFields = new int[numKeys];
-            for (int k = 0; k < keyFields.length; k++) {
-                keyFields[k] = k;
-            }
-
-            tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
-                    keyFields, isPartitioned, true);
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
-                    splitsAndConstraint.second);
-
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
-            ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
-        return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
-                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
-                false);
-    }
-
-    private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
-            throws AlgebricksException {
-        // No filtering condition.
-        if (filterExpr == null) {
-            return null;
-        }
-        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
-                typeEnv, inputSchemas, context);
-        return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload) throws AlgebricksException {
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-        int i = 0;
-        int j = 0;
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            modificationCallbackPrimaryKeyFields[j] = i;
-            i++;
-            j++;
-        }
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys] = idx;
-        }
-
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-
-            ARecordType recType = (ARecordType) itemType;
-
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numKeys;
-                btreeFields = new int[numKeys];
-                for (int k = 0; k < btreeFields.length; k++) {
-                    btreeFields[k] = k;
-                }
-            }
-
-            List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-            for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), recType);
-                IAType keyType = keyPairType.first;
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
-                        true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                        fieldPermutation, indexOp,
-                        new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
-                                compactionInfo.first, compactionInfo.second,
-                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
-                                filterCmpFactories, btreeFields, filterFields, !temp),
-                        filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            IndexType indexType, boolean bulkload) throws AlgebricksException {
-
-        // Check the index is length-partitioned or not.
-        boolean isPartitioned;
-        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-            isPartitioned = true;
-        } else {
-            isPartitioned = false;
-        }
-
-        // Sanity checks.
-        if (primaryKeys.size() > 1) {
-            throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key.");
-        }
-        // The size of secondaryKeys can be two if it receives input from its
-        // TokenizeOperator- [token, number of token]
-        if (secondaryKeys.size() > 1 && !isPartitioned) {
-            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
-        } else if (secondaryKeys.size() > 2 && isPartitioned) {
-            throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
-        }
-
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-        // For tokenization, sorting and loading.
-        // One token (+ optional partitioning field) + primary keys: [token,
-        // number of token, PK]
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
-        int numTokenKeyPairFields = (!isPartitioned) ? 1 + primaryKeys.size() : 2 + primaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-        int i = 0;
-        int j = 0;
-
-        // If the index is partitioned: [token, number of token]
-        // Otherwise: [token]
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            modificationCallbackPrimaryKeyFields[j] = i;
-            i++;
-            j++;
-        }
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys] = idx;
-        }
-
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType;
-        try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
-                    .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-
-            ARecordType recType = (ARecordType) itemType;
-
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-
-            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-
-            int numTokenFields = 0;
-
-            // SecondaryKeys.size() can be two if it comes from the bulkload.
-            // In this case, [token, number of token] are the secondaryKeys.
-            if (!isPartitioned || (secondaryKeys.size() > 1)) {
-                numTokenFields = secondaryKeys.size();
-            } else if (isPartitioned && (secondaryKeys.size() == 1)) {
-                numTokenFields = secondaryKeys.size() + 1;
-            }
-
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
-            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
-            IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
-            IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
-
-            IAType secondaryKeyType = null;
-
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyExprs.get(0), recType);
-            secondaryKeyType = keyPairType.first;
-
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-
-            i = 0;
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
-            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
-            if (isPartitioned) {
-                // The partitioning field is hardcoded to be a short *without*
-                // an Asterix type tag.
-                tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
-                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
-            }
-            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
-                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-
-            int[] filterFields = null;
-            int[] invertedIndexFields = null;
-            int[] filterFieldsForNonBulkLoadOps = null;
-            int[] invertedIndexFieldsForNonBulkLoadOps = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numTokenFields + primaryKeys.size();
-                invertedIndexFields = new int[numTokenFields + primaryKeys.size()];
-                for (int k = 0; k < invertedIndexFields.length; k++) {
-                    invertedIndexFields[k] = k;
-                }
-
-                filterFieldsForNonBulkLoadOps = new int[numFilterFields];
-                filterFieldsForNonBulkLoadOps[0] = numTokenKeyPairFields;
-                invertedIndexFieldsForNonBulkLoadOps = new int[numTokenKeyPairFields];
-                for (int k = 0; k < invertedIndexFieldsForNonBulkLoadOps.length; k++) {
-                    invertedIndexFieldsForNonBulkLoadOps[k] = k;
-                }
-            }
-
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataFlowFactory;
-            if (!isPartitioned) {
-                indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
-                        new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
-                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
-                        invertedIndexFieldsForNonBulkLoadOps, !temp);
-            } else {
-                indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
-                        new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                        compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
-                        filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
-                        invertedIndexFieldsForNonBulkLoadOps, !temp);
-            }
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
-                        numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
-            } else {
-                op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload) throws AlgebricksException {
-        try {
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
-
-            String itemTypeName = dataset.getItemTypeName();
-            IAType itemType = MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-            ARecordType recType = (ARecordType) itemType;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyExprs.get(0), recType);
-            IAType spatialType = keyPairType.first;
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
-            int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-            int numSecondaryKeys = dimension * 2;
-            int numPrimaryKeys = primaryKeys.size();
-            int numKeys = numSecondaryKeys + numPrimaryKeys;
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
-
-            int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-            int[] fieldPermutation = new int[numKeys + numFilterFields];
-            int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-            int i = 0;
-            int j = 0;
-
-            for (LogicalVariable varKey : secondaryKeys) {
-                int idx = propagatedSchema.findVariable(varKey);
-                fieldPermutation[i] = idx;
-                i++;
-            }
-            for (LogicalVariable varKey : primaryKeys) {
-                int idx = propagatedSchema.findVariable(varKey);
-                fieldPermutation[i] = idx;
-                modificationCallbackPrimaryKeyFields[j] = i;
-                i++;
-                j++;
-            }
-
-            if (numFilterFields > 0) {
-                int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-                fieldPermutation[numKeys] = idx;
-            }
-            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-            IPrimitiveValueProviderFactory[] valueProviderFactories =
-                    new IPrimitiveValueProviderFactory[numSecondaryKeys];
-            for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-                        .getBinaryComparatorFactory(nestedKeyType, true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-            }
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
-            int[] btreeFields = new int[primaryComparatorFactories.length];
-            for (int k = 0; k < btreeFields.length; k++) {
-                btreeFields[k] = k + numSecondaryKeys;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] rtreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numSecondaryKeys + numPrimaryKeys;
-                rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-                for (int k = 0; k < rtreeFields.length; k++) {
-                    rtreeFields[k] = k;
-                }
-            }
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
-                            indexOp, ResourceType.LSM_RTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider,
-                            indexOp, ResourceType.LSM_RTREE,
-                            dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
-            IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                    valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
-                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                    compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
-                    rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
-                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        primaryComparatorFactories, btreeFields, fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idff);
-            } else {
-                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
-                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                        splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
-                        idff, filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE);
-            }
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
-            throws AlgebricksException {
-        return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
-                numKeyFields / 2);
     }
 
     /**
@@ -1945,12 +952,48 @@
         for (String nd : nodeGroup) {
             numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
-        return numElementsHint /= numPartitions;
+        numElementsHint = numElementsHint / numPartitions;
+        return numElementsHint;
     }
 
-    @Override
-    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
-        return AsterixBuiltinFunctions.lookupFunction(fid);
+    protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+            Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated,
+            List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException {
+        try {
+            configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
+            IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName,
+                    configuration, itemType, metaType);
+
+            // check to see if dataset is indexed
+            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+
+            if (filesIndex != null && filesIndex.getPendingOp() == 0) {
+                // get files
+                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+                Iterator<ExternalFile> iterator = files.iterator();
+                while (iterator.hasNext()) {
+                    if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+                        iterator.remove();
+                    }
+                }
+            }
+
+            return adapterFactory;
+        } catch (Exception e) {
+            throw new AlgebricksException("Unable to create adapter", e);
+        }
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
+            throws AlgebricksException {
+        return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
+                numKeyFields / 2);
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
@@ -1971,7 +1014,7 @@
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
             throws MetadataException {
-        DatasourceAdapter adapter = null;
+        DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
         adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
 
@@ -1982,98 +1025,16 @@
         return adapter;
     }
 
-    public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
-        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
-                : dataverse;
-        if (dv == null) {
-            return null;
-        }
-        try {
-            return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dv, dataset);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
-        NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName);
-        List<String> partitions = new ArrayList<>();
-        for (String location : nodeGroup.getNodeNames()) {
-            int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(location);
-            for (int i = 0; i < numPartitions; i++) {
-                partitions.add(location);
-            }
-        }
-        return new DefaultNodeGroupDomain(partitions);
-    }
-
-    public IAType findType(String dataverse, String typeName) throws AlgebricksException {
-        if (dataverse == null || typeName == null) {
-            return null;
-        }
-        Datatype type;
-        try {
-            type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(
-                    "Metadata exception while looking up type '" + typeName + "' in dataverse '" + dataverse + "'", e);
-        }
-        if (type == null) {
-            throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
-        }
-        return type.getDatatype();
-    }
-
-    public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
-        try {
-            return MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, feedName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public FeedPolicyEntity findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
-        try {
-            return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
         return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 
-    public IDataFormat getFormat() {
-        return FormatUtils.getDefaultFormat();
-    }
-
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
         return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName,
                 datasetName, targetIdxName, create);
     }
 
-    public AsterixStorageProperties getStorageProperties() {
-        return storageProperties;
-    }
-
-    public Map<String, Integer> getLocks() {
-        return locks;
-    }
-
-    public void setLocks(Map<String, Integer> locks) {
-        this.locks = locks;
-    }
-
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
             JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
@@ -2081,7 +1042,7 @@
             throws AlgebricksException {
         try {
             // Get data type
-            IAType itemType = null;
+            IAType itemType;
             itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
 
@@ -2123,9 +1084,7 @@
                     appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
                     metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
                     retainMissing, context.getMissingWriterFactory());
-
-            // Return value
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
+            return new Pair<>(op, spPc.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -2149,7 +1108,7 @@
         int numKeys = primaryKeys.size();
         int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
         int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
-        // Move key fields to front. {keys, record, filters}
+        // Move key fields to front. [keys, record, filters]
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
         int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
@@ -2190,8 +1149,8 @@
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+                            indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2277,23 +1236,192 @@
         }
     }
 
-    // TODO refactor this method
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
-            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
-            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
-            ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
+            JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
+            throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only scan datasets of records.");
+        }
+
+        ISerializerDeserializer<?> payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory);
+
+        AlgebricksPartitionConstraint constraint;
+        try {
+            constraint = adapterFactory.getPartitionConstraint();
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+
+        return new Pair<>(dataScanner, constraint);
+    }
+
+    private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
+            List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames,
+            ARecordType recType, DatasetType dsType, boolean hasMeta, List<Integer> primaryIndexKeyIndicators,
+            List<Integer> secondaryIndexIndicators, ARecordType metaType) throws AlgebricksException {
+
+        IBinaryComparatorFactory[] comparatorFactories;
+        ITypeTraits[] typeTraits;
+        int sidxKeyFieldCount = sidxKeyFieldNames.size();
+        int pidxKeyFieldCount = pidxKeyFieldNames.size();
+        typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount];
+        comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount];
+
+        int i = 0;
+        for (; i < sidxKeyFieldCount; ++i) {
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
+                    sidxKeyFieldNames.get(i),
+                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
+            IAType keyType = keyPairType.first;
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) {
+            IAType keyType = null;
+            try {
+                switch (dsType) {
+                    case INTERNAL:
+                        keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1)
+                                ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+                                : recType.getSubFieldType(pidxKeyFieldNames.get(j));
+                        break;
+                    case EXTERNAL:
+                        keyType = IndexingConstants.getFieldType(j);
+                        break;
+                    default:
+                        throw new AlgebricksException("Unknown Dataset Type");
+                }
+            } catch (AsterixException e) {
+                throw new AlgebricksException(e);
+            }
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        return new Pair<>(comparatorFactories, typeTraits);
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload,
+            List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
+
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(),
+                datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = keys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+        // Move key fields to front.
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
+                + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        fieldPermutation[i++] = propagatedSchema.findVariable(payload);
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+            fieldPermutation[i++] = idx;
+        }
+        if (additionalNonFilteringFields != null) {
+            for (LogicalVariable variable : additionalNonFilteringFields) {
+                int idx = propagatedSchema.findVariable(variable);
+                fieldPermutation[i++] = idx;
+            }
+        }
+
+        try {
+            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), dataset.getDatasetName());
+            String indexName = primaryIndex.getIndexName();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType);
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                    itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                    splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+                            indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    itemType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = DatasetUtils.createFilterFields(dataset);
+            int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                    : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+                            txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                    new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            IOperatorDescriptor op;
+            if (bulkload) {
+                long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh);
+            } else {
+                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                        fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
+            }
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
+            IndexOperation indexOp, IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
         String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         Index secondaryIndex;
         try {
             secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -2301,44 +1429,374 @@
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+
         ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
-        if (prevAdditionalFilteringKey != null) {
+        if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) {
             prevAdditionalFilteringKeys = new ArrayList<>();
             prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
         }
+        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
         switch (secondaryIndex.getIndexType()) {
-            case BTREE: {
-                return getBTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
-                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
-                        prevSecondaryKeys, prevAdditionalFilteringKeys);
-            }
-            case RTREE: {
-                return getRTreeUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
-                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
-                        prevSecondaryKeys, prevAdditionalFilteringKeys);
-            }
+            case BTREE:
+                return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+                        bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+            case RTREE:
+                return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+                        bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
-            case LENGTH_PARTITIONED_NGRAM_INVIX: {
-                return getInvertedIndexUpsertRuntime(dataverseName, datasetName, indexName, propagatedSchema,
-                        primaryKeys, secondaryKeys, additionalFilteringKeys, filterFactory, recordDesc, context, spec,
-                        secondaryIndex.getIndexType(), prevSecondaryKeys, prevAdditionalFilteringKeys);
-            }
-            default: {
+            case LENGTH_PARTITIONED_NGRAM_INVIX:
+                return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
+                        secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+                        secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+            default:
                 throw new AlgebricksException(
-                        "upsert is not implemented for index type: " + secondaryIndex.getIndexType());
-            }
+                        indexOp.name() + "Insert, upsert, and delete not implemented for index type: "
+                                + secondaryIndex.getIndexType());
         }
     }
 
-    //TODO: refactor this method
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexUpsertRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexType indexType,
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+            List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+
+        // generate field permutations
+        int[] fieldPermutation = new int[numKeys + numFilterFields];
+        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
+        int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int i = 0;
+        int j = 0;
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            modificationCallbackPrimaryKeyFields[j] = i;
+            i++;
+            j++;
+        }
+
+        if (numFilterFields > 0) {
+            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+            fieldPermutation[numKeys] = idx;
+        }
+
+        int[] prevFieldPermutation = null;
+        if (indexOp == IndexOperation.UPSERT) {
+            // generate field permutations for prev record
+            prevFieldPermutation = new int[numKeys + numFilterFields];
+            int k = 0;
+            for (LogicalVariable varKey : prevSecondaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                prevFieldPermutation[k] = idx;
+                k++;
+            }
+            for (LogicalVariable varKey : primaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                prevFieldPermutation[k] = idx;
+                k++;
+            }
+            // Filter can only be one field!
+            if (numFilterFields > 0) {
+                int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+                prevFieldPermutation[numKeys] = idx;
+            }
+        }
+
+        String itemTypeName = dataset.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
+                    .getDatatype();
+            validateRecordType(itemType);
+            ARecordType recType = (ARecordType) itemType;
+
+            // Index parameters.
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] btreeFields = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numKeys;
+                btreeFields = new int[numKeys];
+                for (int k = 0; k < btreeFields.length; k++) {
+                    btreeFields[k] = k;
+                }
+            }
+
+            List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+            for (i = 0; i < secondaryKeys.size(); ++i) {
+                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                        secondaryKeyNames.get(i), recType);
+                IAType keyType = keyPairType.first;
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            for (List<String> partitioningKey : partitioningKeys) {
+                IAType keyType = recType.getSubFieldType(partitioningKey);
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                        true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                ++i;
+            }
+
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
+                    btreeFields, filterFields, !temp);
+            IOperatorDescriptor op;
+            if (bulkload) {
+                long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                        comparatorFactories, bloomFilterKeyFields, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh);
+            } else if (indexOp == IndexOperation.UPSERT) {
+                op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                        fieldPermutation, idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+            } else {
+                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
+                        fieldPermutation, indexOp,
+                        new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
+                                compactionInfo.first, compactionInfo.second,
+                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                                storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
+                                filterCmpFactories, btreeFields, filterFields, !temp),
+                        filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
+            }
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
+            List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+
+        try {
+            Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
+            boolean temp = dataset.getDatasetDetails().isTemp();
+            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
+            String itemTypeName = dataset.getItemTypeName();
+            IAType itemType = MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+            validateRecordType(itemType);
+            ARecordType recType = (ARecordType) itemType;
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), indexName);
+            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                    secondaryKeyExprs.get(0), recType);
+            IAType spatialType = keyPairType.first;
+            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
+                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
+            int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+            int numSecondaryKeys = dimension * 2;
+            int numPrimaryKeys = primaryKeys.size();
+            int numKeys = numSecondaryKeys + numPrimaryKeys;
+            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+
+            int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+            int[] fieldPermutation = new int[numKeys + numFilterFields];
+            int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+            int i = 0;
+            int j = 0;
+
+            for (LogicalVariable varKey : secondaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                fieldPermutation[i] = idx;
+                i++;
+            }
+            for (LogicalVariable varKey : primaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                fieldPermutation[i] = idx;
+                modificationCallbackPrimaryKeyFields[j] = i;
+                i++;
+                j++;
+            }
+
+            if (numFilterFields > 0) {
+                int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
+                fieldPermutation[numKeys] = idx;
+            }
+
+            int[] prevFieldPermutation = null;
+            if (indexOp == IndexOperation.UPSERT) {
+                // Get field permutation for previous value
+                prevFieldPermutation = new int[numKeys + numFilterFields];
+                i = 0;
+
+                // Get field permutation for new value
+                for (LogicalVariable varKey : prevSecondaryKeys) {
+                    int idx = propagatedSchema.findVariable(varKey);
+                    prevFieldPermutation[i] = idx;
+                    i++;
+                }
+                for (int k = 0; k < numPrimaryKeys; k++) {
+                    prevFieldPermutation[k + i] = fieldPermutation[k + i];
+                    i++;
+                }
+
+                if (numFilterFields > 0) {
+                    int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+                    prevFieldPermutation[numKeys] = idx;
+                }
+            }
+
+            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+            IPrimitiveValueProviderFactory[] valueProviderFactories =
+                    new IPrimitiveValueProviderFactory[numSecondaryKeys];
+            for (i = 0; i < numSecondaryKeys; i++) {
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                        .getBinaryComparatorFactory(nestedKeyType, true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+            }
+            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            for (List<String> partitioningKey : partitioningKeys) {
+                IAType keyType = recType.getSubFieldType(partitioningKey);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                ++i;
+            }
+            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
+            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
+            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
+                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+            int[] btreeFields = new int[primaryComparatorFactories.length];
+            for (int k = 0; k < btreeFields.length; k++) {
+                btreeFields[k] = k + numSecondaryKeys;
+            }
+
+            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recType, context.getBinaryComparatorFactoryProvider());
+            int[] filterFields = null;
+            int[] rtreeFields = null;
+            if (filterTypeTraits != null) {
+                filterFields = new int[1];
+                filterFields[0] = numSecondaryKeys + numPrimaryKeys;
+                rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
+                for (int k = 0; k < rtreeFields.length; k++) {
+                    rtreeFields[k] = k;
+                }
+            }
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int datasetId = dataset.getDatasetId();
+            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = temp
+                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            dataset.hasMetaPart());
+
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, mdTxnCtx);
+
+            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
+                    primaryComparatorFactories);
+            IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
+                    valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
+                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+                    compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                    AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+                    rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
+            IOperatorDescriptor op;
+            if (bulkload) {
+                long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(),
+                        appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
+                        primaryComparatorFactories, btreeFields, fieldPermutation,
+                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idff);
+            } else if (indexOp == IndexOperation.UPSERT) {
+                op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, idff,
+                        filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
+            } else {
+                op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
+                        splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
+                        idff, filterFactory, false, indexName, null, modificationCallbackFactory,
+                        NoOpOperationCallbackFactory.INSTANCE);
+            }
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+            AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload,
             List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
             throws AlgebricksException {
         // Check the index is length-partitioned or not.
@@ -2362,10 +1820,7 @@
             throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
         }
 
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         boolean temp = dataset.getDatasetDetails().isTemp();
         isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
 
@@ -2397,42 +1852,41 @@
             j++;
         }
         if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
+            int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
             fieldPermutation[numKeys] = idx;
         }
 
-        // Find permutations for prev value
-        int[] prevFieldPermutation = new int[numKeys + numFilterFields];
-        i = 0;
+        int[] prevFieldPermutation = null;
+        if (indexOp == IndexOperation.UPSERT) {
+            // Find permutations for prev value
+            prevFieldPermutation = new int[numKeys + numFilterFields];
+            i = 0;
 
-        // If the index is partitioned: [token, number of token]
-        // Otherwise: [token]
-        for (LogicalVariable varKey : prevSecondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            prevFieldPermutation[i] = idx;
-            i++;
-        }
+            // If the index is partitioned: [token, number of token]
+            // Otherwise: [token]
+            for (LogicalVariable varKey : prevSecondaryKeys) {
+                int idx = propagatedSchema.findVariable(varKey);
+                prevFieldPermutation[i] = idx;
+                i++;
+            }
 
-        for (int k = 0; k < primaryKeys.size(); k++) {
-            prevFieldPermutation[k + i] = fieldPermutation[k + i];
-            i++;
-        }
+            for (int k = 0; k < primaryKeys.size(); k++) {
+                prevFieldPermutation[k + i] = fieldPermutation[k + i];
+                i++;
+            }
 
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
-            prevFieldPermutation[numKeys] = idx;
+            if (numFilterFields > 0) {
+                int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
+                prevFieldPermutation[numKeys] = idx;
+            }
         }
 
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType;
         try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
                     .getDatatype();
-
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-
+            validateRecordType(itemType);
             ARecordType recType = (ARecordType) itemType;
 
             // Index parameters.
@@ -2459,8 +1913,7 @@
             IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
                     dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
 
-            IAType secondaryKeyType = null;
-
+            IAType secondaryKeyType;
             Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
                     secondaryKeyExprs.get(0), recType);
             secondaryKeyType = keyPairType.first;
@@ -2511,8 +1964,7 @@
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2520,10 +1972,10 @@
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                             ResourceType.LSM_INVERTED_INDEX)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
                             ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
@@ -2548,208 +2000,102 @@
                         filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
                         invertedIndexFieldsForNonBulkLoadOps, !temp);
             }
-            IOperatorDescriptor op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
-                    appContext.getStorageManagerInterface(), splitsAndConstraint.first,
-                    appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
-                    invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
-                    indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, prevFieldPermutation);
-
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
+            IOperatorDescriptor op;
+            if (bulkload) {
+                long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, false,
+                        numElementsHint, false, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, indexDataFlowFactory);
+            } else if (indexOp == IndexOperation.UPSERT) {
+                op = new AsterixLSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation,
+                        indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName,
+                        prevFieldPermutation);
+            } else {
+                op = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+                        appContext.getStorageManagerInterface(), splitsAndConstraint.first,
+                        appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
+                        invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+                        indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName);
+            }
+            return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
 
-    //TODO: refactor this method
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeUpsertRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
-            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
-            throws AlgebricksException {
-        try {
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+    // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(String dataverseName,
+            String datasetName, String indexName, IOperatorSchema inputSchema, IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+            JobSpecification spec, IndexType indexType) throws AlgebricksException {
 
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+        // Sanity checks.
+        if (primaryKeys.size() > 1) {
+            throw new AlgebricksException("Cannot tokenize composite primary key.");
+        }
+        if (secondaryKeys.size() > 1) {
+            throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
+        }
 
-            String itemTypeName = dataset.getItemTypeName();
-            IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
-            if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
-            }
-            ARecordType recType = (ARecordType) itemType;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+        boolean isPartitioned;
+        if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
 
-            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
-                    secondaryKeyExprs.get(0), recType);
-            IAType spatialType = keyPairType.first;
+        // Number of Keys that needs to be propagated
+        int numKeys = inputSchema.getSize();
 
-            boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
-                    || spatialType.getTypeTag() == ATypeTag.POINT3D;
-            int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
-            int numSecondaryKeys = dimension * 2;
-            int numPrimaryKeys = primaryKeys.size();
-            int numKeys = numSecondaryKeys + numPrimaryKeys;
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
-
-            int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
-            int[] fieldPermutation = new int[numKeys + numFilterFields];
-            int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
-            int i = 0;
-            int j = 0;
-
-            // Get field permutation for new value
-            for (LogicalVariable varKey : secondaryKeys) {
-                int idx = propagatedSchema.findVariable(varKey);
-                fieldPermutation[i] = idx;
-                i++;
-            }
-            for (LogicalVariable varKey : primaryKeys) {
-                int idx = propagatedSchema.findVariable(varKey);
-                fieldPermutation[i] = idx;
-                modificationCallbackPrimaryKeyFields[j] = i;
-                i++;
-                j++;
-            }
-
-            if (numFilterFields > 0) {
-                int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
-                fieldPermutation[numKeys] = idx;
-            }
-
-            // Get field permutation for previous value
-            int[] prevFieldPermutation = new int[numKeys + numFilterFields];
-            i = 0;
-
-            // Get field permutation for new value
-            for (LogicalVariable varKey : prevSecondaryKeys) {
-                int idx = propagatedSchema.findVariable(varKey);
-                prevFieldPermutation[i] = idx;
-                i++;
-            }
-            for (int k = 0; k < numPrimaryKeys; k++) {
-                prevFieldPermutation[k + i] = fieldPermutation[k + i];
-                i++;
-            }
-
-            if (numFilterFields > 0) {
-                int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
-                prevFieldPermutation[numKeys] = idx;
-            }
-
-            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
-            IPrimitiveValueProviderFactory[] valueProviderFactories =
-                    new IPrimitiveValueProviderFactory[numSecondaryKeys];
-            for (i = 0; i < numSecondaryKeys; i++) {
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-                        .getBinaryComparatorFactory(nestedKeyType, true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
-                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-            }
-            List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = recType.getSubFieldType(partitioningKey);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
-
-            ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset);
-            IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                    dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
-            int[] btreeFields = new int[primaryComparatorFactories.length];
-            for (int k = 0; k < btreeFields.length; k++) {
-                btreeFields[k] = k + numSecondaryKeys;
-            }
-
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] rtreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numSecondaryKeys + numPrimaryKeys;
-                rtreeFields = new int[numSecondaryKeys + numPrimaryKeys];
-                for (int k = 0; k < rtreeFields.length; k++) {
-                    rtreeFields[k] = k;
+        // Get the rest of Logical Variables that are not (PK or SK) and each
+        // variable's positions.
+        // These variables will be propagated through TokenizeOperator.
+        List<LogicalVariable> otherKeys = new ArrayList<>();
+        if (inputSchema.getSize() > 0) {
+            for (int k = 0; k < inputSchema.getSize(); k++) {
+                boolean found = false;
+                for (LogicalVariable varKey : primaryKeys) {
+                    if (varKey.equals(inputSchema.getVariable(k))) {
+                        found = true;
+                        break;
+                    } else {
+                        found = false;
+                    }
+                }
+                if (!found) {
+                    for (LogicalVariable varKey : secondaryKeys) {
+                        if (varKey.equals(inputSchema.getVariable(k))) {
+                            found = true;
+                            break;
+                        } else {
+                            found = false;
+                        }
+                    }
+                }
+                if (!found) {
+                    otherKeys.add(inputSchema.getVariable(k));
                 }
             }
-
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_RTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
-
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
-                    primaryComparatorFactories);
-            IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
-                    valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
-                    new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
-                    compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                    AqlMetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
-                    rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR);
-            AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
-                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, idff,
-                    filterFactory, false, indexName, null, modificationCallbackFactory,
-                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
         }
-    }
 
-    //TODO: refactor this method
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeUpsertRuntime(String dataverseName,
-            String datasetName, String indexName, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, AsterixTupleFilterFactory filterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
-            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
-            throws AlgebricksException {
-        // we start with the btree
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+        // For tokenization, sorting and loading.
+        // One token (+ optional partitioning field) + primary keys + secondary
+        // keys + other variables
+        // secondary keys and other variables will be just passed to the
+        // IndexInsertDelete Operator.
+        int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
 
-        int numKeys = primaryKeys.size() + secondaryKeys.size();
-        int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
+        // generate field permutations for the input
+        int[] fieldPermutation = new int[numKeys];
 
-        // generate field permutations
-        int[] fieldPermutation = new int[numKeys + numFilterFields];
-        int[] bloomFilterKeyFields = new int[secondaryKeys.size()];
         int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
         int i = 0;
         int j = 0;
-        for (LogicalVariable varKey : secondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
         for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
@@ -2757,39 +2103,26 @@
             i++;
             j++;
         }
-        // Filter can only be one field!
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(additionalFilteringKeys.get(0));
-            fieldPermutation[numKeys] = idx;
+        for (LogicalVariable varKey : otherKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
         }
 
-        // generate field permutations for prev record
-        int[] prevFieldPermutation = new int[numKeys + numFilterFields];
-        int k = 0;
-        for (LogicalVariable varKey : prevSecondaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            prevFieldPermutation[k] = idx;
-            k++;
-        }
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            prevFieldPermutation[k] = idx;
-            k++;
-        }
-        // Filter can only be one field!
-        if (numFilterFields > 0) {
-            int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
-            prevFieldPermutation[numKeys] = idx;
-        }
-
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType;
         try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
+            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
                     .getDatatype();
 
             if (itemType.getTypeTag() != ATypeTag.RECORD) {
-                throw new AlgebricksException("Only record types can be indexed.");
+                throw new AlgebricksException("Only record types can be tokenized.");
             }
 
             ARecordType recType = (ARecordType) itemType;
@@ -2798,74 +2131,118 @@
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
 
-            ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
-                    recType, context.getBinaryComparatorFactoryProvider());
-            int[] filterFields = null;
-            int[] btreeFields = null;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numKeys;
-                btreeFields = new int[numKeys];
-                for (int l = 0; l < btreeFields.length; l++) {
-                    btreeFields[l] = l;
-                }
-            }
+            List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
+            List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
 
-            List<List<String>> secondaryKeyNames = secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-            for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), recType);
-                IAType keyType = keyPairType.first;
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
+            int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
+            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
+            ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
+
+            // Find the key type of the secondary key. If it's a derived type,
+            // return the derived type.
+            // e.g. UNORDERED LIST -> return UNORDERED LIST type
+            IAType secondaryKeyType;
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0),
+                    secondaryKeyExprs.get(0), recType);
+            secondaryKeyType = keyPairType.first;
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
+            i = 0;
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] =
-                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
-                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                invListsTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
 
-            IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+            tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
+            if (isPartitioned) {
+                // The partitioning field is hardcoded to be a short *without*
+                // an Asterix type tag.
+                tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+            }
+
+            IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
+                    secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
+
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    splitProviderAndPartitionConstraintsForDataset(
-                            dataverseName, datasetName, indexName, temp);
+                    splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+                            dataset.getDatasetDetails().isTemp());
 
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
-            IModificationOperationCallbackFactory modificationCallbackFactory = temp
-                    ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_BTREE)
-                    : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
-                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
+            // Generate Output Record format
+            ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
+            ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
+            ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
-                    .getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
-                    new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
-                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                    AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                    storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
-                    btreeFields, filterFields, !temp);
-            AsterixLSMTreeUpsertOperatorDescriptor op = new AsterixLSMTreeUpsertOperatorDescriptor(spec, recordDesc,
-                    appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
-                    splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                    idfh, filterFactory, false, indexName, null, modificationCallbackFactory,
-                    NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation);
-            return new Pair<>(op, splitsAndConstraint.second);
+            // The order of the output record: propagated variables (including
+            // PK and SK), token, and number of token.
+            // #1. propagate all input variables
+            for (int k = 0; k < recordDesc.getFieldCount(); k++) {
+                tokenKeyPairFields[k] = recordDesc.getFields()[k];
+                tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
+            }
+            int tokenOffset = recordDesc.getFieldCount();
+
+            // #2. Specify the token type
+            tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
+            tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
+            tokenOffset++;
+
+            // #3. Specify the length-partitioning key: number of token
+            if (isPartitioned) {
+                tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
+                tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
+            }
+
+            RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
+            IOperatorDescriptor tokenizerOp;
+
+            // Keys to be tokenized : SK
+            int docField = fieldPermutation[fieldPermutation.length - 1];
+
+            // Keys to be propagated
+            int[] keyFields = new int[numKeys];
+            for (int k = 0; k < keyFields.length; k++) {
+                keyFields[k] = k;
+            }
+
+            tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
+                    keyFields, isPartitioned, true);
+            return new Pair<>(tokenizerOp, splitsAndConstraint.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
     }
-}
+
+    private IBinaryComparatorFactory[] getMergedComparatorFactories(IBinaryComparatorFactory[] comparatorFactories,
+            IBinaryComparatorFactory[] primaryComparatorFactories) {
+        IBinaryComparatorFactory[] btreeCompFactories;
+        int btreeCompFactoriesCount = comparatorFactories.length + primaryComparatorFactories.length;
+        btreeCompFactories = new IBinaryComparatorFactory[btreeCompFactoriesCount];
+        int i = 0;
+        for (; i < comparatorFactories.length; i++) {
+            btreeCompFactories[i] = comparatorFactories[i];
+        }
+        for (int j = 0; i < btreeCompFactoriesCount; i++, j++) {
+            btreeCompFactories[i] = primaryComparatorFactories[j];
+        }
+        return btreeCompFactories;
+    }
+
+    private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
+            throws AlgebricksException {
+        // No filtering condition.
+        if (filterExpr == null) {
+            return null;
+        }
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
+                typeEnv, inputSchemas, context);
+        return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
+    }
+
+    private void validateRecordType(IAType itemType) throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Only record types can be indexed.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
new file mode 100644
index 0000000..0b22dab
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
+import org.apache.asterix.metadata.entities.Datatype;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class MetadataManagerUtil {
+
+    private MetadataManagerUtil() {
+        throw new AssertionError("This util class should not be initialized.");
+    }
+
+    public static IAType findType(MetadataTransactionContext mdTxnCtx, String dataverse, String typeName)
+            throws AlgebricksException {
+        if (dataverse == null || typeName == null) {
+            return null;
+        }
+        Datatype type;
+        try {
+            type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(
+                    "Metadata exception while looking up type '" + typeName + "' in dataverse '" + dataverse + "'", e);
+        }
+        if (type == null) {
+            throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
+        }
+        return type.getDatatype();
+    }
+
+    public static ARecordType findOutputRecordType(MetadataTransactionContext mdTxnCtx, String dataverse,
+            String outputRecordType) throws AlgebricksException {
+        if (outputRecordType == null) {
+            return null;
+        }
+        if (dataverse == null) {
+            throw new AlgebricksException("Cannot declare output-record-type with no dataverse!");
+        }
+        IAType type = findType(mdTxnCtx, dataverse, outputRecordType);
+        if (!(type instanceof ARecordType)) {
+            throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
+        }
+        return (ARecordType) type;
+    }
+
+    public static DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String adapterName) throws MetadataException {
+        DatasourceAdapter adapter;
+        // search in default namespace (built-in adapter)
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+
+        // search in dataverse (user-defined adapter)
+        if (adapter == null) {
+            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+        }
+        return adapter;
+    }
+
+    public static Dataset findDataset(MetadataTransactionContext mdTxnCtx, String dataverse, String dataset)
+            throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static Dataset findExistingDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName) throws AlgebricksException {
+        Dataset dataset = findDataset(mdTxnCtx, dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        return dataset;
+    }
+
+    public static INodeDomain findNodeDomain(MetadataTransactionContext mdTxnCtx, String nodeGroupName)
+            throws AlgebricksException {
+        NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName);
+        List<String> partitions = new ArrayList<>();
+        for (String location : nodeGroup.getNodeNames()) {
+            int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(location);
+            for (int i = 0; i < numPartitions; i++) {
+                partitions.add(location);
+            }
+        }
+        return new DefaultNodeGroupDomain(partitions);
+    }
+
+    public static Feed findFeed(MetadataTransactionContext mdTxnCtx, String dataverse, String feedName)
+            throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, feedName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverse,
+            String policyName) throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static List<Index> getDatasetIndexes(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName) throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static AqlDataSource findDataSource(MetadataTransactionContext mdTxnCtx, AqlSourceId id)
+            throws AlgebricksException {
+        AqlSourceId aqlId = id;
+        try {
+            return lookupSourceInMetadata(mdTxnCtx, aqlId);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    public static AqlDataSource lookupSourceInMetadata(MetadataTransactionContext mdTxnCtx, AqlSourceId aqlId)
+            throws AlgebricksException {
+        Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(), aqlId.getDatasourceName());
+        if (dataset == null) {
+            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
+        }
+        IAType itemType = findType(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+        IAType metaItemType = findType(mdTxnCtx, dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+        INodeDomain domain = findNodeDomain(mdTxnCtx, dataset.getNodeGroupName());
+        byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET
+                : AqlDataSourceType.INTERNAL_DATASET;
+        return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
+                dataset.getDatasetDetails(), domain);
+    }
+}
\ No newline at end of file