Cleaned up redundant code by sharing, and removed some dead code.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_fix_issue_96@449 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
index 23f1e77..ba7c797 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
@@ -36,16 +36,12 @@
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
@@ -77,10 +73,11 @@
         this.dataverseName = dataverseName;
         this.outputFile = outputFile;
         this.config = config;
-        if (stores == null && online)
+        if (stores == null && online) {
             this.stores = AsterixProperties.INSTANCE.getStores();
-        else
+        } else {
             this.stores = stores;
+        }
         this.types = types;
         this.typeDataGenMap = typeDataGenMap;
         this.writerFactory = writerFactory;
@@ -109,17 +106,16 @@
     }
 
     public void disconnectFromDataverse() throws AlgebricksException {
-        if (!isConnected)
+        if (!isConnected) {
             throw new AlgebricksException("You are not connected to any dataverse");
-        else {
-            dataverseName = null;
-            format = null;
-            isConnected = false;
         }
+        dataverseName = null;
+        format = null;
+        isConnected = false;
     }
 
     public boolean isConnectedToDataverse() {
-        return this.isConnected;
+        return isConnected;
     }
 
     public String getDataverseName() {
@@ -131,8 +127,9 @@
     }
 
     public IDataFormat getFormat() throws AlgebricksException {
-        if (!isConnected)
+        if (!isConnected) {
             throw new AlgebricksException("You need first to connect to a dataverse.");
+        }
         return format;
     }
 
@@ -190,8 +187,6 @@
         }
     }
     
-    // TODO: Check if this is correct. Not sure what the index name of the primary index is.
-    // TODO: Rename this to getPrimaryIndex().
     public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
         try {
             return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
@@ -212,22 +207,6 @@
         this.outputFile = outputFile;
     }
 
-    public List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> computePartitioningEvaluatorFactories(
-            List<String> partitioningExprs, ARecordType recType) {
-        List<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>> evalFactories = new ArrayList<Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType>>(
-                partitioningExprs.size());
-        for (String expr : partitioningExprs) {
-            Triple<ICopyEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFact = null;
-            try {
-                evalFact = format.partitioningEvaluatorFactory(recType, expr);
-            } catch (AlgebricksException e) {
-                throw new IllegalStateException(e);
-            }
-            evalFactories.add(evalFact);
-        }
-        return evalFactories;
-    }
-
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
             String datasetName, String targetIdxName) throws AlgebricksException {
         FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index f9b5f1d..a3d3472 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -19,7 +19,6 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
-import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
@@ -122,7 +121,7 @@
             List<LogicalVariable> projectVariables, boolean projectPushed, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
             throws AlgebricksException {
-    	Dataset dataset = metadata.findDataset(dataSource.getId().getDatasetName());
+        Dataset dataset = metadata.findDataset(dataSource.getId().getDatasetName());
         if (dataset == null) {
             throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
         }
@@ -178,7 +177,7 @@
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Can only scan datasets of records.");
         }
-        
+
         IDatasourceReadAdapter adapter;
         try {
             adapter = (IDatasourceReadAdapter) Class.forName(datasetDetails.getAdapter()).newInstance();
@@ -204,7 +203,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-        		datasetDetails.getAdapter(), datasetDetails.getProperties(), rt, scannerDesc);
+                datasetDetails.getAdapter(), datasetDetails.getProperties(), rt, scannerDesc);
         dataScanner.setDatasourceAdapter(adapter);
         AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
@@ -279,8 +278,8 @@
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
             AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, String datasetName,
-            Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
+            Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+            boolean highKeyInclusive) throws AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
         if (primaryIndex != null) {
@@ -318,8 +317,7 @@
     @SuppressWarnings("rawtypes")
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
             AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, Dataset dataset, String indexName, int[] keyFields)
-            throws AlgebricksException {
+            String datasetName, Dataset dataset, String indexName, int[] keyFields) throws AlgebricksException {
         ARecordType recType = (ARecordType) metadata.findType(dataset.getItemTypeName());
         boolean isSecondary = true;
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
@@ -351,8 +349,7 @@
                             + numSecondaryKeys
                             + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
         }
-        Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(
-                secondaryKeyFields.get(0), recType);
+        Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType);
         IAType keyType = keyTypePair.first;
         if (keyType == null) {
             throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
@@ -388,7 +385,8 @@
         }
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
         RecordDescriptor recDesc = new RecordDescriptor(recordFields);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
                 comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories), false,
@@ -490,11 +488,11 @@
 
         String itemTypeName = dataset.getItemTypeName();
         ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
-        
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);       
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                dataset, itemType, context.getBinaryComparatorFactoryProvider());
-                
+
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                itemType, context.getBinaryComparatorFactoryProvider());
+
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
@@ -506,98 +504,65 @@
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
     }
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOp indexOp,
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasetName();
+        int numKeys = keys.size();
+        // Move key fields to front.
+        int[] fieldPermutation = new int[numKeys + 1];
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+
+        Dataset dataset = metadata.findDataset(datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
+        String indexName = primaryIndex.getIndexName();
+
+        String itemTypeName = dataset.getItemTypeName();
+        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
+
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
+                itemType, context.getBinaryComparatorFactoryProvider());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+    }
+
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
             LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
             throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasetName();
-        int numKeys = keys.size();
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
-
-        Dataset dataset = metadata.findDataset(datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
-        String indexName = primaryIndex.getIndexName();
-
-        String itemTypeName = dataset.getItemTypeName();
-        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
-        
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
-
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                dataset, itemType, context.getBinaryComparatorFactoryProvider());
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
-                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.INSERT,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+        return getInsertOrDeleteRuntime(IndexOp.INSERT, dataSource, propagatedSchema, keys, payload, recordDesc,
+                context, spec);
     }
 
-    // TODO: Seems like we can share the code with getInsertRuntime().
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
             LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
             throws AlgebricksException {
-        String datasetName = dataSource.getId().getDatasetName();
-        int numKeys = keys.size();
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1];
-        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
-
-        Dataset dataset = metadata.findDataset(datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
-        String indexName = primaryIndex.getIndexName();
-
-        String itemTypeName = dataset.getItemTypeName();
-        ARecordType itemType = (ARecordType) metadata.findType(itemTypeName);
-
-        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
-
-        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-
-        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
-                dataset, itemType, context.getBinaryComparatorFactoryProvider());
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
-                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.DELETE,
-                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
-        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+        return getInsertOrDeleteRuntime(IndexOp.DELETE, dataSource, propagatedSchema, keys, payload, recordDesc,
+                context, spec);
     }
 
-    // TODO: Seems like we can share this code with getIndexDeleteRuntime().
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(IndexOp indexOp,
             IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
@@ -610,36 +575,40 @@
         }
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        if (secondaryIndex.getIndexType() == IndexType.BTREE) {
-            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                    filterFactory, recordDesc, context, spec, IndexOp.INSERT);
-        } else {
-            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                    filterFactory, recordDesc, context, spec, IndexOp.INSERT);
+        switch (secondaryIndex.getIndexType()) {
+            case BTREE: {
+                return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                        filterFactory, recordDesc, context, spec, indexOp);
+            }
+            case RTREE: {
+                return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                        filterFactory, recordDesc, context, spec, indexOp);
+            }
+            default: {
+                throw new AlgebricksException("Insert and delete not implemented for index type: "
+                        + secondaryIndex.getIndexType());
+            }
         }
     }
 
     @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec) throws AlgebricksException {
+        return getIndexInsertOrDeleteRuntime(IndexOp.INSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
+    }
+
+    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
             IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
             JobGenContext context, JobSpecification spec) throws AlgebricksException {
-        String indexName = dataSourceIndex.getId();
-        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
-        Dataset dataset = metadata.findDataset(datasetName);
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + datasetName);
-        }
-        Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
-        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
-        if (secondaryIndex.getIndexType() == IndexType.BTREE) {
-            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                    filterFactory, recordDesc, context, spec, IndexOp.DELETE);
-        } else {
-            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
-                    filterFactory, recordDesc, context, spec, IndexOp.DELETE);
-        }
+        return getIndexInsertOrDeleteRuntime(IndexOp.DELETE, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+                primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
     }
 
     private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
@@ -691,8 +660,8 @@
         ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
         for (i = 0; i < secondaryKeys.size(); ++i) {
-            Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs
-                    .get(i).toString(), recType);
+            Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs.get(i).toString(),
+                    recType);
             IAType keyType = keyPairType.first;
             comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
                     true);
@@ -730,8 +699,7 @@
         ARecordType recType = (ARecordType) itemType;
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         List<String> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
-		Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(
-				secondaryKeyExprs.get(0), recType);
+        Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs.get(0), recType);
         IAType spatialType = keyPairType.first;
         int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numSecondaryKeys = dimension * 2;
@@ -792,5 +760,4 @@
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return AsterixBuiltinFunctions.lookupFunction(fid);
     }
-
 }