[ASTERIXDB-2791][IDX] Make BTree secondary indexes accept NULLs/MISSINGs

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch is to allow BTree secondary indexes to store
NULLs and MISSINGs

Change-Id: I3342caa38f52d8d7019bbcd5bf81fc0cc01ca110
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12065
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index e567974..97954eb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -629,8 +629,8 @@
             }
 
             if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
-                indexUpdate.setUpsertIndicatorExpr(new MutableObject<>(
-                        new VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar())));
+                indexUpdate.setOperationExpr(new MutableObject<>(
+                        new VariableReferenceExpression(primaryIndexModificationOp.getOperationVar())));
             }
 
             context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 16d5878..c053ab9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -513,8 +513,8 @@
             upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
                     Collections.singletonList(new MutableObject<>(metaVarRef)), InsertDeleteUpsertOperator.Kind.UPSERT,
                     false);
-            upsertOp.setUpsertIndicatorVar(context.newVar());
-            upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+            upsertOp.setOperationVar(context.newVar());
+            upsertOp.setOperationVarType(BuiltinType.AINT8);
             // Create and add a new variable used for representing the original record
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(targetDatasource.getItemType());
@@ -567,8 +567,8 @@
             upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
             upsertOp.setAdditionalFilteringExpressions(filterExprs);
             upsertOp.setSourceLocation(sourceLoc);
-            upsertOp.setUpsertIndicatorVar(context.newVar());
-            upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+            upsertOp.setOperationVar(context.newVar());
+            upsertOp.setOperationVarType(BuiltinType.AINT8);
             // Create and add a new variable used for representing the original record
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(recordType);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 2a22ac6..1ed57c0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -24,6 +24,8 @@
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,7 +100,11 @@
         recordBuilder.append("{\"values\":[");
         for (int j = 0; j < tuple.getFieldCount(); ++j) {
             bbis.setByteBuffer(ByteBuffer.wrap(tuple.getFieldData(j)), tuple.getFieldStart(j));
-            recordBuilder.append(secondaryRecDesc.getFields()[j].deserialize(dis));
+            IAObject field = (IAObject) secondaryRecDesc.getFields()[j].deserialize(dis);
+            if (field.getType().getTypeTag() == ATypeTag.MISSING) {
+                continue;
+            }
+            recordBuilder.append(field);
             recordBuilder.append(",");
         }
         recordBuilder.deleteCharAt(recordBuilder.length() - 1);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ccb73b6..fcdc25a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -389,7 +389,7 @@
 
     // Gets the primary key permutation for upserts.
     private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
-        // upsertIndicatorVar + prev record
+        // (upsert) operationVar + prev record
         int f = 2;
         // add the previous meta second
         if (dataset.hasMetaPart()) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
index ac11618..272c3c8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
@@ -1 +1,2 @@
+{ "values": [ 1 ] }
 { "values": [ 95, 2 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
index fab2047..4725f37 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
@@ -1,3 +1,7 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
 { "values": [ 555, 5 ] }
 { "values": [ 888, 8 ] }
 { "values": [ 999, 9 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
index fab2047..8d89e67 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ null, 6 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
+{ "values": [ 7 ] }
 { "values": [ 555, 5 ] }
 { "values": [ 888, 8 ] }
 { "values": [ 999, 9 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
index 85c91e6..50f4b46 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 7 ] }
+{ "values": [ null, 9 ] }
+{ "values": [ 3 ] }
+{ "values": [ 6 ] }
+{ "values": [ 8 ] }
 { "values": [ 222, 2 ] }
 { "values": [ 444, 4 ] }
 { "values": [ 555, 5 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 878c94e..832f3e9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -38,7 +38,7 @@
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final long INITIAL_CHECKPOINT_ID = 0;
     // TODO(mblow): remove this marker & related logic once we no longer are able to read indexes prior to the fix
-    private static final long HAS_NULL_MISSING_VALUES_FIX = -2;
+    private static final long HAS_NULL_MISSING_VALUES_FIX = -3;
     private long id;
     private long validComponentSequence;
     private long lowWatermark;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1721975..d970b3a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -69,8 +69,8 @@
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
 import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.MetadataManager;
@@ -803,12 +803,12 @@
             IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
-            ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+            ILogicalExpression filterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
             LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
             JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
         return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
                 inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
-                context, spec, false, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey,
+                context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
                 secondaryKeysPipelines, null);
     }
 
@@ -1213,7 +1213,7 @@
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr,
             RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
-            LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
             LogicalVariable prevAdditionalFilteringKey, List<List<AlgebricksPipeline>> secondaryKeysPipelines,
             IOperatorSchema pipelineTopSchema) throws AlgebricksException {
         String indexName = dataSourceIndex.getId();
@@ -1246,30 +1246,30 @@
             case BTREE:
                 return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
-                        bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+                        bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
             case ARRAY:
                 if (bulkload) {
                     // In the case of bulk-load, we do not handle any nested plans. We perform the exact same behavior
                     // as a normal B-Tree bulk load.
                     return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                             secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec,
-                            indexOp, bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+                            indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
                 } else {
                     return getArrayIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
-                            additionalNonKeyFields, inputRecordDesc, spec, indexOp, upsertIndicatorVar,
+                            additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
                             secondaryKeysPipelines);
                 }
             case RTREE:
                 return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
-                        bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+                        bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
                 return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
-                        secondaryIndex.getIndexType(), bulkload, upsertIndicatorVar, prevSecondaryKeys,
+                        secondaryIndex.getIndexType(), bulkload, operationVar, prevSecondaryKeys,
                         prevAdditionalFilteringKeys);
             default:
                 throw new AlgebricksException(
@@ -1281,7 +1281,7 @@
             String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
             throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1350,10 +1350,10 @@
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
                         BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
-                int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+                int operationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
-                        filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
-                        BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+                        filterFactory, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
+                        prevFieldPermutation);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
                         filterFactory, false, modificationCallbackFactory);
@@ -1367,8 +1367,8 @@
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexRuntime(DataverseName dataverseName,
             String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc, JobSpecification spec,
-            IndexOperation indexOp, LogicalVariable upsertIndicatorVar,
-            List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
+            IndexOperation indexOp, LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+            throws AlgebricksException {
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         int numPrimaryKeys = primaryKeys.size();
@@ -1404,9 +1404,9 @@
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
             IOperatorDescriptor op;
             if (indexOp == IndexOperation.UPSERT) {
-                int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+                int operationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
-                        idfh, modificationCallbackFactory, upsertIndicatorFieldIndex, BinaryBooleanInspector.FACTORY,
+                        idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
                         secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1));
             } else {
                 op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
@@ -1422,7 +1422,7 @@
             String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
             throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1505,10 +1505,10 @@
                     StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
                     indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
         } else if (indexOp == IndexOperation.UPSERT) {
-            int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+            int operationFieldIndex = propagatedSchema.findVariable(operationVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
-                    indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
-                    BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+                    indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, operationFieldIndex,
+                    BinaryIntegerInspector.FACTORY, prevFieldPermutation);
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
                     indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory);
@@ -1521,7 +1521,7 @@
             List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
             RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
-            IndexType indexType, boolean bulkload, LogicalVariable upsertIndicatorVar,
+            IndexType indexType, boolean bulkload, LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
             throws AlgebricksException {
         // Check the index is length-partitioned or not.
@@ -1536,7 +1536,7 @@
         // Sanity checks.
         if (primaryKeys.size() > 1) {
             throw new AlgebricksException(
-                    "Cannot create inverted index on " + dataset(PLURAL) + "with composite primary key.");
+                    "Cannot create inverted index on " + dataset(PLURAL) + " with composite primary key.");
         }
         // The size of secondaryKeys can be two if it receives input from its
         // TokenizeOperator- [token, number of token]
@@ -1618,10 +1618,10 @@
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
                         null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
             } else if (indexOp == IndexOperation.UPSERT) {
-                int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+                int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
-                        filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
-                        BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+                        filterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
                         indexDataFlowFactory, filterFactory, false, modificationCallbackFactory);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2e10d77..158efb2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -475,9 +475,9 @@
         IDataFormat dataFormat = metadataProvider.getDataFormat();
 
         int f = 0;
-        // add the upsert indicator var
-        outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
-        outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+        // add the upsert operation var
+        outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.AINT8);
+        outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.AINT8);
         f++;
         // add the previous record
         outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 33f5b62..9645e7a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -89,12 +89,6 @@
             AlgebricksMetaOperatorDescriptor asterixAssignOp =
                     createExternalAssignOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
 
-            // If any of the secondary fields are nullable, then add a select op that filters nulls.
-            AlgebricksMetaOperatorDescriptor selectOp = null;
-            if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
-                selectOp = createFilterNullsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
-            }
-
             // Sort by secondary keys.
             ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
             // Create secondary BTree bulk load op.
@@ -117,12 +111,7 @@
             spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
             root = metaOp;
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
-            if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
-                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
-                spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
-            } else {
-                spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
-            }
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
             spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
             spec.addRoot(root);
             spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
@@ -149,13 +138,6 @@
             spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
 
             sourceOp = targetOp;
-            if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
-                // if any of the secondary fields are nullable, then add a select op that filters nulls.
-                // assign op ----> select op
-                targetOp = createFilterNullsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
-                spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
-                sourceOp = targetOp;
-            }
 
             // no need to sort if the index is secondary primary index
             if (!indexDetails.getKeyFieldNames().isEmpty()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8e0de5f..591ff9a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
-import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.AInt8;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -80,6 +80,9 @@
 
 public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
+    public static final AInt8 UPSERT_NEW = new AInt8((byte) 0);
+    public static final AInt8 UPSERT_EXISTING = new AInt8((byte) 1);
+    public static final AInt8 DELETE_EXISTING = new AInt8((byte) 2);
     private static final Logger LOGGER = LogManager.getLogger();
     private static final ThreadLocal<DateFormat> DATE_FORMAT =
             ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
@@ -172,21 +175,22 @@
                             if (cursor.hasNext()) {
                                 cursor.next();
                                 prevTuple = cursor.getTuple();
-                                appendUpsertIndicator(!isDelete);
+                                appendOperationIndicator(!isDelete, true);
                                 appendFilterToPrevTuple();
                                 appendPrevRecord();
                                 appendPreviousMeta();
                                 appendFilterToOutput();
                             } else {
-                                appendUpsertIndicator(!isDelete);
+                                appendOperationIndicator(!isDelete, false);
                                 appendPreviousTupleAsMissing();
                             }
                         } finally {
                             cursor.close(); // end the search
                         }
                     } else {
+                        // simple upsert into a non-filtered dataset having no secondary indexes
                         searchCallback.before(key); // lock
-                        appendUpsertIndicator(!isDelete);
+                        appendOperationIndicator(true, false);
                         appendPreviousTupleAsMissing();
                     }
                     beforeModification(tuple);
@@ -353,8 +357,17 @@
         }
     }
 
-    protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
-        recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos);
+    @SuppressWarnings("unchecked") // using serializer
+    protected void appendOperationIndicator(boolean isUpsert, boolean prevTupleExists) throws IOException {
+        if (isUpsert) {
+            if (prevTupleExists) {
+                recordDesc.getFields()[0].serialize(UPSERT_EXISTING, dos);
+            } else {
+                recordDesc.getFields()[0].serialize(UPSERT_NEW, dos);
+            }
+        } else {
+            recordDesc.getFields()[0].serialize(DELETE_EXISTING, dos);
+        }
         tb.addFieldEndOffset();
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index a4b4012..3231162 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.runtime.operators;
 
 import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -33,21 +33,21 @@
 
 public class LSMSecondaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final int[] prevValuePermutation;
-    protected final int upsertIndicatorFieldIndex;
-    protected final IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory;
+    protected final int operationFieldIndex;
+    protected final IBinaryIntegerInspectorFactory operationInspectorFactory;
 
     public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
             ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
-            int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
+            int operationFieldIndex, IBinaryIntegerInspectorFactory operationInspectorFactory,
             int[] prevValuePermutation) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, tupleFilterFactory, false,
                 modificationOpCallbackFactory);
         this.prevValuePermutation = prevValuePermutation;
-        this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
-        this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory;
+        this.operationFieldIndex = operationFieldIndex;
+        this.operationInspectorFactory = operationInspectorFactory;
     }
 
     @Override
@@ -55,7 +55,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, modCallbackFactory,
-                tupleFilterFactory, fieldPermutation, intputRecDesc, upsertIndicatorFieldIndex,
-                upsertIndicatorInspectorFactory, prevValuePermutation);
+                tupleFilterFactory, fieldPermutation, intputRecDesc, operationFieldIndex, operationInspectorFactory,
+                prevValuePermutation);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index b588323..482be06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -19,14 +19,13 @@
 package org.apache.asterix.runtime.operators;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,38 +47,32 @@
  * -If old secondary index tuple == new secondary index tuple
  * --do nothing
  * -else
- * --If any old field is null/missing?
- * ---do nothing
- * --else
- * ---delete old secondary index tuple
- * --If any new field is null/missing?
- * ---do nothing
- * --else
- * ---insert new secondary index tuple
+ * --perform the operation based on the operation kind
  */
 public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
-    private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
-    private final int numberOfFields;
-    private final boolean isPrimaryKeyIndex;
+    protected static final int UPSERT_NEW = LSMPrimaryUpsertOperatorNodePushable.UPSERT_NEW.getByteValue();
+    protected static final int UPSERT_EXISTING = LSMPrimaryUpsertOperatorNodePushable.UPSERT_EXISTING.getByteValue();
+    protected static final int DELETE_EXISTING = LSMPrimaryUpsertOperatorNodePushable.DELETE_EXISTING.getByteValue();
 
-    protected final int upsertIndicatorFieldIndex;
-    protected final IBinaryBooleanInspector upsertIndicatorInspector;
+    private final PermutingFrameTupleReference prevTuple = new PermutingFrameTupleReference();
+    private final int numberOfFields;
+
+    protected final int operationFieldIndex;
+    protected final IBinaryIntegerInspector operationInspector;
     protected AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
             ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
-            int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
-            int[] prevValuePermutation) throws HyracksDataException {
+            int operationFieldIndex, IBinaryIntegerInspectorFactory operationInspectorFactory,
+            int[] prevTuplePermutation) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, tupleFilterFactory);
-        this.prevValueTuple.setFieldPermutation(prevValuePermutation);
-        this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
-        this.upsertIndicatorInspector = upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx);
+        this.prevTuple.setFieldPermutation(prevTuplePermutation);
+        this.operationFieldIndex = operationFieldIndex;
+        this.operationInspector = operationInspectorFactory.createBinaryIntegerInspector(ctx);
         this.numberOfFields = fieldPermutation.length;
-        // a primary key index only has primary keys, and thus these two permutations are the same
-        this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation, prevValuePermutation);
     }
 
     @Override
@@ -97,36 +90,24 @@
         for (int i = 0; i < tupleCount; i++) {
             try {
                 frameTuple.reset(accessor, i);
-                boolean isUpsert =
-                        upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
-                                frameTuple.getFieldStart(upsertIndicatorFieldIndex),
-                                frameTuple.getFieldLength(upsertIndicatorFieldIndex));
-                // if both previous value and new value are null, then we skip
+                int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+                        frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
                 tuple.reset(accessor, i);
-                prevValueTuple.reset(accessor, i);
+                prevTuple.reset(accessor, i);
 
-                boolean newTupleHasNullOrMissing = hasNullOrMissing(tuple);
-                boolean oldTupleHasNullOrMissing = hasNullOrMissing(prevValueTuple);
-                if (newTupleHasNullOrMissing && oldTupleHasNullOrMissing) {
-                    // No op
-                    continue;
-                }
-                // At least, one is not null
-                if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) {
-                    // For a secondary index, if the secondary key values do not change, we can skip upserting it.
-                    // However, for a primary key index, we cannot do this because it only contains primary keys
-                    // which are always the same
-                    continue;
-                }
-                // if all old fields are known values, then delete. skip deleting if any is null or missing
-                if (!oldTupleHasNullOrMissing) {
-                    abstractModCallback.setOp(Operation.DELETE);
-                    lsmAccessor.forceDelete(prevValueTuple);
-                }
-                // if all new fields are known values, then insert. skip inserting if any is null or missing
-                if (isUpsert && !newTupleHasNullOrMissing) {
+                if (operation == UPSERT_NEW) {
                     abstractModCallback.setOp(Operation.INSERT);
                     lsmAccessor.forceInsert(tuple);
+                } else if (operation == UPSERT_EXISTING) {
+                    if (!TupleUtils.equalTuples(tuple, prevTuple, numberOfFields)) {
+                        abstractModCallback.setOp(Operation.DELETE);
+                        lsmAccessor.forceDelete(prevTuple);
+                        abstractModCallback.setOp(Operation.INSERT);
+                        lsmAccessor.forceInsert(tuple);
+                    }
+                } else if (operation == DELETE_EXISTING) {
+                    abstractModCallback.setOp(Operation.DELETE);
+                    lsmAccessor.forceDelete(prevTuple);
                 }
             } catch (Exception e) {
                 throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
index d077987..bbf0af1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
@@ -20,7 +20,7 @@
 
 import java.util.List;
 
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -39,11 +39,11 @@
 
     public LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(JobSpecification spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
-            IModificationOperationCallbackFactory modCallbackFactory, int upsertIndicatorFieldIndex,
-            IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
-            List<AlgebricksPipeline> secondaryKeysPipeline, List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
-        super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null, modCallbackFactory,
-                upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory, null);
+            IModificationOperationCallbackFactory modCallbackFactory, int operationFieldIndex,
+            IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
+            List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
+        super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null, modCallbackFactory, operationFieldIndex,
+                operationInspectorFactory, null);
         this.secondaryKeysPipeline = secondaryKeysPipeline;
         this.prevSecondaryKeysPipeline = prevSecondaryKeysPipeline;
     }
@@ -53,7 +53,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(ctx, partition, indexHelperFactory,
-                modCallbackFactory, fieldPermutation, inputRecDesc, upsertIndicatorFieldIndex,
-                upsertIndicatorInspectorFactory, secondaryKeysPipeline, prevSecondaryKeysPipeline);
+                modCallbackFactory, fieldPermutation, inputRecDesc, operationFieldIndex, operationInspectorFactory,
+                secondaryKeysPipeline, prevSecondaryKeysPipeline);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 303bece..3fe3e85 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -24,7 +24,7 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
@@ -48,12 +48,11 @@
 
     public LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
-            int[] fieldPermutation, RecordDescriptor inputRecDesc, int upsertIndicatorFieldIndex,
-            IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
-            List<AlgebricksPipeline> secondaryKeysPipeline, List<AlgebricksPipeline> prevSecondaryKeysPipeline)
-            throws HyracksDataException {
+            int[] fieldPermutation, RecordDescriptor inputRecDesc, int operationFieldIndex,
+            IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
+            List<AlgebricksPipeline> prevSecondaryKeysPipeline) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, modCallbackFactory, null, fieldPermutation, inputRecDesc,
-                upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory, null);
+                operationFieldIndex, operationInspectorFactory, null);
         this.numberOfPrimaryKeyAndFilterFields = fieldPermutation.length;
         this.startOfNewKeyPipelines = buildStartOfPipelines(secondaryKeysPipeline, inputRecDesc, false);
         this.startOfPrevKeyPipelines = buildStartOfPipelines(prevSecondaryKeysPipeline, inputRecDesc, true);
@@ -110,9 +109,9 @@
 
             // Insert all of our new keys, if the PIDX operation was also an UPSERT (and not just a DELETE).
             frameTuple.reset(accessor, i);
-            if (upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
-                    frameTuple.getFieldStart(upsertIndicatorFieldIndex),
-                    frameTuple.getFieldLength(upsertIndicatorFieldIndex))) {
+            int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+                    frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
+            if (operation == UPSERT_NEW || operation == UPSERT_EXISTING) {
                 writeTupleToPipelineStarts(buffer, i, startOfNewKeyPipelines);
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 8540d0b..9bafe3e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -218,10 +218,10 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
             IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
-            LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
-            JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
+            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr, LogicalVariable operationVar,
+            List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
+            RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec,
+            List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
 
     public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 7b6ed26..f2ac41a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -45,7 +45,7 @@
  * perform. In the case of bulk-loading, {@link #operation} will be INSERT and the {@link #bulkload} flag will be
  * raised. {@link #additionalFilteringExpressions} and {@link #numberOfAdditionalNonFilteringFields} refers to the
  * additionalFilteringExpressions, numberOfAdditionalNonFilteringFields found in the corresponding primary index
- * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link #upsertIndicatorExpr} also originates from
+ * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link #operationExpr} also originates from
  * {@link InsertDeleteUpsertOperator}, and is only set when the operation is of kind UPSERT.
  * <p>
  *
@@ -84,7 +84,7 @@
     // used for upsert operations
     private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
     private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
-    private Mutable<ILogicalExpression> upsertIndicatorExpr;
+    private Mutable<ILogicalExpression> operationExpr;
     private final int numberOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
@@ -129,8 +129,8 @@
                 }
             }
         }
-        // Upsert indicator var <For upsert>
-        if (upsertIndicatorExpr != null && visitor.transform(upsertIndicatorExpr)) {
+        // Operation indicator var <For upsert>
+        if (operationExpr != null && visitor.transform(operationExpr)) {
             b = true;
         }
         // Old secondary <For upsert>
@@ -177,8 +177,8 @@
                 e.getValue().getUsedVariables(vars);
             }
         }
-        if (getUpsertIndicatorExpr() != null) {
-            getUpsertIndicatorExpr().getValue().getUsedVariables(vars);
+        if (getOperationExpr() != null) {
+            getOperationExpr().getValue().getUsedVariables(vars);
         }
     }
 
@@ -273,11 +273,11 @@
         return numberOfAdditionalNonFilteringFields;
     }
 
-    public Mutable<ILogicalExpression> getUpsertIndicatorExpr() {
-        return upsertIndicatorExpr;
+    public Mutable<ILogicalExpression> getOperationExpr() {
+        return operationExpr;
     }
 
-    public void setUpsertIndicatorExpr(Mutable<ILogicalExpression> upsertIndicatorExpr) {
-        this.upsertIndicatorExpr = upsertIndicatorExpr;
+    public void setOperationExpr(Mutable<ILogicalExpression> operationExpr) {
+        this.operationExpr = operationExpr;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index ce2f801..5a54c7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -59,9 +59,9 @@
     // previous additional fields (for UPSERT)
     private List<LogicalVariable> prevAdditionalNonFilteringVars;
     private List<Object> prevAdditionalNonFilteringTypes;
-    // a boolean variable that indicates whether it's a delete operation (false) or upsert operation (true)
-    private LogicalVariable upsertIndicatorVar;
-    private Object upsertIndicatorVarType;
+    // int describing the upsert (e.g. upserting a new tuple or to an existing tuple or just deleting an existing one)
+    private LogicalVariable operationVar;
+    private Object operationVarType;
 
     public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
             List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -88,7 +88,7 @@
     public void recomputeSchema() throws AlgebricksException {
         schema = new ArrayList<LogicalVariable>();
         if (operation == Kind.UPSERT) {
-            schema.add(upsertIndicatorVar);
+            schema.add(operationVar);
             // The upsert case also produces the previous record
             schema.add(prevRecordVar);
             if (additionalNonFilteringExpressions != null) {
@@ -103,7 +103,7 @@
 
     public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
         if (operation == Kind.UPSERT) {
-            producedVariables.add(upsertIndicatorVar);
+            producedVariables.add(operationVar);
             producedVariables.add(prevRecordVar);
             if (prevAdditionalNonFilteringVars != null) {
                 producedVariables.addAll(prevAdditionalNonFilteringVars);
@@ -150,7 +150,7 @@
             @Override
             public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
                 if (operation == Kind.UPSERT) {
-                    target.addVariable(upsertIndicatorVar);
+                    target.addVariable(operationVar);
                     target.addVariable(prevRecordVar);
                     if (prevAdditionalNonFilteringVars != null) {
                         for (LogicalVariable var : prevAdditionalNonFilteringVars) {
@@ -175,7 +175,7 @@
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
         if (operation == Kind.UPSERT) {
-            env.setVarType(upsertIndicatorVar, upsertIndicatorVarType);
+            env.setVarType(operationVar, operationVarType);
             env.setVarType(prevRecordVar, prevRecordType);
             if (prevAdditionalNonFilteringVars != null) {
                 for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) {
@@ -229,20 +229,20 @@
         this.prevRecordVar = prevRecordVar;
     }
 
-    public LogicalVariable getUpsertIndicatorVar() {
-        return upsertIndicatorVar;
+    public LogicalVariable getOperationVar() {
+        return operationVar;
     }
 
-    public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) {
-        this.upsertIndicatorVar = upsertIndicatorVar;
+    public void setOperationVar(LogicalVariable operationVar) {
+        this.operationVar = operationVar;
     }
 
-    public Object getUpsertIndicatorVarType() {
-        return upsertIndicatorVarType;
+    public Object getOperationVarType() {
+        return operationVarType;
     }
 
-    public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) {
-        this.upsertIndicatorVarType = upsertIndicatorVarType;
+    public void setOperationVarType(Object operationVarType) {
+        this.operationVarType = operationVarType;
     }
 
     public void setPrevRecordType(Object recordType) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index a9f9626..c4b4e47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -617,7 +617,7 @@
                 || !Objects.equals(op.getPrevSecondaryKeyExprs(), insertOpArg.getPrevSecondaryKeyExprs())
                 || !Objects.equals(op.getPrevAdditionalFilteringExpression(),
                         insertOpArg.getPrevAdditionalFilteringExpression())
-                || !Objects.equals(op.getUpsertIndicatorExpr(), insertOpArg.getUpsertIndicatorExpr())
+                || !Objects.equals(op.getOperationExpr(), insertOpArg.getOperationExpr())
                 || (op.getNumberOfAdditionalNonFilteringFields() != insertOpArg
                         .getNumberOfAdditionalNonFilteringFields())) {
             return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 6e8b425..5aa63ae 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -421,8 +421,8 @@
             Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         boolean producedVarFound = false;
         if (op.getOperation() == InsertDeleteUpsertOperator.Kind.UPSERT) {
-            if (op.getUpsertIndicatorVar() != null && op.getUpsertIndicatorVar().equals(pair.first)) {
-                op.setUpsertIndicatorVar(pair.second);
+            if (op.getOperationVar() != null && op.getOperationVar().equals(pair.first)) {
+                op.setOperationVar(pair.second);
                 producedVarFound = true;
             } else if (op.getBeforeOpRecordVar() != null && op.getBeforeOpRecordVar().equals(pair.first)) {
                 op.setPrevRecordVar(pair.second);
@@ -453,7 +453,7 @@
         substUsedVariablesInExpr(op.getSecondaryKeyExpressions(), pair.first, pair.second);
         substUsedVariablesInExpr(op.getFilterExpression(), pair.first, pair.second);
         substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
-        substUsedVariablesInExpr(op.getUpsertIndicatorExpr(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getOperationExpr(), pair.first, pair.second);
         substUsedVariablesInExpr(op.getPrevSecondaryKeyExprs(), pair.first, pair.second);
         substUsedVariablesInExpr(op.getPrevAdditionalFilteringExpression(), pair.first, pair.second);
         if (!op.getNestedPlans().isEmpty()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 23fe3b2..4fb30b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -429,8 +429,8 @@
                 e.getValue().getUsedVariables(usedVariables);
             }
         }
-        if (op.getUpsertIndicatorExpr() != null) {
-            op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables);
+        if (op.getOperationExpr() != null) {
+            op.getOperationExpr().getValue().getUsedVariables(usedVariables);
         }
         visitNestedPlans(op);
         return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 2d9dabe..26581b1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -57,14 +57,14 @@
     private final ILogicalExpression filterExpr;
     private final IDataSourceIndex<?, ?> dataSourceIndex;
     private final List<LogicalVariable> additionalFilteringKeys;
-    private final LogicalVariable upsertIndicatorVar;
+    private final LogicalVariable operationVar;
     private final List<LogicalVariable> prevSecondaryKeys;
     private final LogicalVariable prevAdditionalFilteringKey;
     private final int numOfAdditionalNonFilteringFields;
 
     public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
             List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
-            IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable upsertIndicatorVar,
+            IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
             int numOfAdditionalNonFilteringFields) {
         this.primaryKeys = primaryKeys;
@@ -76,7 +76,7 @@
         }
         this.dataSourceIndex = dataSourceIndex;
         this.additionalFilteringKeys = additionalFilteringKeys;
-        this.upsertIndicatorVar = upsertIndicatorVar;
+        this.operationVar = operationVar;
         this.prevSecondaryKeys = prevSecondaryKeys;
         this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
         this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields;
@@ -157,7 +157,7 @@
                 break;
             case UPSERT:
                 runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
-                        typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, upsertIndicatorVar,
+                        typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, operationVar,
                         prevSecondaryKeys, prevAdditionalFilteringKey, inputDesc, context, spec, secondaryKeyPipelines);
                 break;
             default:
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index b95971f..cd0d996 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -428,11 +428,11 @@
                 return new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
                         opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex());
             } else {
-                LogicalVariable upsertIndicatorVar = null;
+                LogicalVariable operationVar = null;
                 List<LogicalVariable> prevSecondaryKeys = null;
                 LogicalVariable prevAdditionalFilteringKey = null;
                 if (opInsDel.getOperation() == Kind.UPSERT) {
-                    upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+                    operationVar = getKey(opInsDel.getOperationExpr().getValue());
                     prevSecondaryKeys = new ArrayList<>();
                     getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
                     if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
@@ -442,9 +442,8 @@
                     }
                 }
                 return new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
-                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), upsertIndicatorVar,
-                        prevSecondaryKeys, prevAdditionalFilteringKey,
-                        opInsDel.getNumberOfAdditionalNonFilteringFields());
+                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), operationVar, prevSecondaryKeys,
+                        prevAdditionalFilteringKey, opInsDel.getNumberOfAdditionalNonFilteringFields());
             }
         }