Merge "Merge branch 'gerrit/cheshire-cat'"
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index e567974..97954eb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -629,8 +629,8 @@
}
if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
- indexUpdate.setUpsertIndicatorExpr(new MutableObject<>(
- new VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar())));
+ indexUpdate.setOperationExpr(new MutableObject<>(
+ new VariableReferenceExpression(primaryIndexModificationOp.getOperationVar())));
}
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 16d5878..c053ab9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -513,8 +513,8 @@
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadVarRef, varRefsForLoading,
Collections.singletonList(new MutableObject<>(metaVarRef)), InsertDeleteUpsertOperator.Kind.UPSERT,
false);
- upsertOp.setUpsertIndicatorVar(context.newVar());
- upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+ upsertOp.setOperationVar(context.newVar());
+ upsertOp.setOperationVarType(BuiltinType.AINT8);
// Create and add a new variable used for representing the original record
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(targetDatasource.getItemType());
@@ -567,8 +567,8 @@
upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
upsertOp.setAdditionalFilteringExpressions(filterExprs);
upsertOp.setSourceLocation(sourceLoc);
- upsertOp.setUpsertIndicatorVar(context.newVar());
- upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
+ upsertOp.setOperationVar(context.newVar());
+ upsertOp.setOperationVarType(BuiltinType.AINT8);
// Create and add a new variable used for representing the original record
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(recordType);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 2a22ac6..1ed57c0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -24,6 +24,8 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,7 +100,11 @@
recordBuilder.append("{\"values\":[");
for (int j = 0; j < tuple.getFieldCount(); ++j) {
bbis.setByteBuffer(ByteBuffer.wrap(tuple.getFieldData(j)), tuple.getFieldStart(j));
- recordBuilder.append(secondaryRecDesc.getFields()[j].deserialize(dis));
+ IAObject field = (IAObject) secondaryRecDesc.getFields()[j].deserialize(dis);
+ if (field.getType().getTypeTag() == ATypeTag.MISSING) {
+ continue;
+ }
+ recordBuilder.append(field);
recordBuilder.append(",");
}
recordBuilder.deleteCharAt(recordBuilder.length() - 1);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ccb73b6..fcdc25a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -389,7 +389,7 @@
// Gets the primary key permutation for upserts.
private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
- // upsertIndicatorVar + prev record
+ // (upsert) operationVar + prev record
int f = 2;
// add the previous meta second
if (dataset.hasMetaPart()) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
index ac116183..272c3c8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/index-bad-fields/index-bad-fields.004.adm
@@ -1 +1,2 @@
+{ "values": [ 1 ] }
{ "values": [ 95, 2 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
index fab2047..4725f37 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.4.adm
@@ -1,3 +1,7 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
{ "values": [ 555, 5 ] }
{ "values": [ 888, 8 ] }
{ "values": [ 999, 9 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
index fab2047..8d89e67 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.5.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 2 ] }
+{ "values": [ null, 6 ] }
+{ "values": [ 3 ] }
+{ "values": [ 4 ] }
+{ "values": [ 7 ] }
{ "values": [ 555, 5 ] }
{ "values": [ 888, 8 ] }
{ "values": [ 999, 9 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
index 85c91e6..50f4b46 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/insert_nulls_with_secondary_idx/insert_nulls_with_secondary_idx.8.adm
@@ -1,3 +1,9 @@
+{ "values": [ null, 1 ] }
+{ "values": [ null, 7 ] }
+{ "values": [ null, 9 ] }
+{ "values": [ 3 ] }
+{ "values": [ 6 ] }
+{ "values": [ 8 ] }
{ "values": [ 222, 2 ] }
{ "values": [ 444, 4 ] }
{ "values": [ 555, 5 ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 878c94e..832f3e9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -38,7 +38,7 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
// TODO(mblow): remove this marker & related logic once we no longer are able to read indexes prior to the fix
- private static final long HAS_NULL_MISSING_VALUES_FIX = -2;
+ private static final long HAS_NULL_MISSING_VALUES_FIX = -3;
private long id;
private long validComponentSequence;
private long lowWatermark;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1721975..d970b3a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -69,8 +69,8 @@
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.MetadataManager;
@@ -803,12 +803,12 @@
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
- ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+ ILogicalExpression filterExpr, LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
- context, spec, false, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey,
+ context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
secondaryKeysPipelines, null);
}
@@ -1213,7 +1213,7 @@
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr,
RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
- LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
LogicalVariable prevAdditionalFilteringKey, List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
@@ -1246,30 +1246,30 @@
case BTREE:
return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case ARRAY:
if (bulkload) {
// In the case of bulk-load, we do not handle any nested plans. We perform the exact same behavior
// as a normal B-Tree bulk load.
return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec,
- indexOp, bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
} else {
return getArrayIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
- additionalNonKeyFields, inputRecordDesc, spec, indexOp, upsertIndicatorVar,
+ additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
secondaryKeysPipelines);
}
case RTREE:
return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- secondaryIndex.getIndexType(), bulkload, upsertIndicatorVar, prevSecondaryKeys,
+ secondaryIndex.getIndexType(), bulkload, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
@@ -1281,7 +1281,7 @@
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1350,10 +1350,10 @@
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
- filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
+ prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
filterFactory, false, modificationCallbackFactory);
@@ -1367,8 +1367,8 @@
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexRuntime(DataverseName dataverseName,
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc, JobSpecification spec,
- IndexOperation indexOp, LogicalVariable upsertIndicatorVar,
- List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
+ IndexOperation indexOp, LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numPrimaryKeys = primaryKeys.size();
@@ -1404,9 +1404,9 @@
storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
IOperatorDescriptor op;
if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
- idfh, modificationCallbackFactory, upsertIndicatorFieldIndex, BinaryBooleanInspector.FACTORY,
+ idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1));
} else {
op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
@@ -1422,7 +1422,7 @@
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
@@ -1505,10 +1505,10 @@
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+ int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
- indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, operationFieldIndex,
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory);
@@ -1521,7 +1521,7 @@
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp,
- IndexType indexType, boolean bulkload, LogicalVariable upsertIndicatorVar,
+ IndexType indexType, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
// Check the index is length-partitioned or not.
@@ -1536,7 +1536,7 @@
// Sanity checks.
if (primaryKeys.size() > 1) {
throw new AlgebricksException(
- "Cannot create inverted index on " + dataset(PLURAL) + "with composite primary key.");
+ "Cannot create inverted index on " + dataset(PLURAL) + " with composite primary key.");
}
// The size of secondaryKeys can be two if it receives input from its
// TokenizeOperator- [token, number of token]
@@ -1618,10 +1618,10 @@
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory);
} else if (indexOp == IndexOperation.UPSERT) {
- int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
+ int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
- filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
- BinaryBooleanInspector.FACTORY, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
+ BinaryIntegerInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, false, modificationCallbackFactory);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2e10d77..158efb2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -475,9 +475,9 @@
IDataFormat dataFormat = metadataProvider.getDataFormat();
int f = 0;
- // add the upsert indicator var
- outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
- outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+ // add the upsert operation var
+ outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.AINT8);
+ outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.AINT8);
f++;
// add the previous record
outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 33f5b62..9645e7a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -89,12 +89,6 @@
AlgebricksMetaOperatorDescriptor asterixAssignOp =
createExternalAssignOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- // If any of the secondary fields are nullable, then add a select op that filters nulls.
- AlgebricksMetaOperatorDescriptor selectOp = null;
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- selectOp = createFilterNullsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- }
-
// Sort by secondary keys.
ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
// Create secondary BTree bulk load op.
@@ -117,12 +111,7 @@
spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0);
root = metaOp;
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0);
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
- } else {
- spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
- }
+ spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
spec.addRoot(root);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
@@ -149,13 +138,6 @@
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
sourceOp = targetOp;
- if (anySecondaryKeyIsNullable || isOverridingKeyFieldTypes) {
- // if any of the secondary fields are nullable, then add a select op that filters nulls.
- // assign op ----> select op
- targetOp = createFilterNullsSelectOp(spec, indexDetails.getKeyFieldNames().size(), secondaryRecDesc);
- spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
- sourceOp = targetOp;
- }
// no need to sort if the index is secondary primary index
if (!indexDetails.getKeyFieldNames().isEmpty()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8e0de5f..591ff9a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -30,7 +30,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
-import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.AInt8;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -80,6 +80,9 @@
public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+ public static final AInt8 UPSERT_NEW = new AInt8((byte) 0);
+ public static final AInt8 UPSERT_EXISTING = new AInt8((byte) 1);
+ public static final AInt8 DELETE_EXISTING = new AInt8((byte) 2);
private static final Logger LOGGER = LogManager.getLogger();
private static final ThreadLocal<DateFormat> DATE_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
@@ -172,21 +175,22 @@
if (cursor.hasNext()) {
cursor.next();
prevTuple = cursor.getTuple();
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(!isDelete, true);
appendFilterToPrevTuple();
appendPrevRecord();
appendPreviousMeta();
appendFilterToOutput();
} else {
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(!isDelete, false);
appendPreviousTupleAsMissing();
}
} finally {
cursor.close(); // end the search
}
} else {
+ // simple upsert into a non-filtered dataset having no secondary indexes
searchCallback.before(key); // lock
- appendUpsertIndicator(!isDelete);
+ appendOperationIndicator(true, false);
appendPreviousTupleAsMissing();
}
beforeModification(tuple);
@@ -353,8 +357,17 @@
}
}
- protected void appendUpsertIndicator(boolean isUpsert) throws IOException {
- recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos);
+ @SuppressWarnings("unchecked") // using serializer
+ protected void appendOperationIndicator(boolean isUpsert, boolean prevTupleExists) throws IOException {
+ if (isUpsert) {
+ if (prevTupleExists) {
+ recordDesc.getFields()[0].serialize(UPSERT_EXISTING, dos);
+ } else {
+ recordDesc.getFields()[0].serialize(UPSERT_NEW, dos);
+ }
+ } else {
+ recordDesc.getFields()[0].serialize(DELETE_EXISTING, dos);
+ }
tb.addFieldEndOffset();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index a4b4012..3231162 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.asterix.runtime.operators;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -33,21 +33,21 @@
public class LSMSecondaryUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final int[] prevValuePermutation;
- protected final int upsertIndicatorFieldIndex;
- protected final IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory;
+ protected final int operationFieldIndex;
+ protected final IBinaryIntegerInspectorFactory operationInspectorFactory;
public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
- int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
+ int operationFieldIndex, IBinaryIntegerInspectorFactory operationInspectorFactory,
int[] prevValuePermutation) {
super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, tupleFilterFactory, false,
modificationOpCallbackFactory);
this.prevValuePermutation = prevValuePermutation;
- this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
- this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory;
+ this.operationFieldIndex = operationFieldIndex;
+ this.operationInspectorFactory = operationInspectorFactory;
}
@Override
@@ -55,7 +55,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, modCallbackFactory,
- tupleFilterFactory, fieldPermutation, intputRecDesc, upsertIndicatorFieldIndex,
- upsertIndicatorInspectorFactory, prevValuePermutation);
+ tupleFilterFactory, fieldPermutation, intputRecDesc, operationFieldIndex, operationInspectorFactory,
+ prevValuePermutation);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index b588323..482be06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -19,14 +19,13 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,38 +47,32 @@
* -If old secondary index tuple == new secondary index tuple
* --do nothing
* -else
- * --If any old field is null/missing?
- * ---do nothing
- * --else
- * ---delete old secondary index tuple
- * --If any new field is null/missing?
- * ---do nothing
- * --else
- * ---insert new secondary index tuple
+ * --perform the operation based on the operation kind
*/
public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
- private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
- private final int numberOfFields;
- private final boolean isPrimaryKeyIndex;
+ protected static final int UPSERT_NEW = LSMPrimaryUpsertOperatorNodePushable.UPSERT_NEW.getByteValue();
+ protected static final int UPSERT_EXISTING = LSMPrimaryUpsertOperatorNodePushable.UPSERT_EXISTING.getByteValue();
+ protected static final int DELETE_EXISTING = LSMPrimaryUpsertOperatorNodePushable.DELETE_EXISTING.getByteValue();
- protected final int upsertIndicatorFieldIndex;
- protected final IBinaryBooleanInspector upsertIndicatorInspector;
+ private final PermutingFrameTupleReference prevTuple = new PermutingFrameTupleReference();
+ private final int numberOfFields;
+
+ protected final int operationFieldIndex;
+ protected final IBinaryIntegerInspector operationInspector;
protected AbstractIndexModificationOperationCallback abstractModCallback;
public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
- int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
- int[] prevValuePermutation) throws HyracksDataException {
+ int operationFieldIndex, IBinaryIntegerInspectorFactory operationInspectorFactory,
+ int[] prevTuplePermutation) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, tupleFilterFactory);
- this.prevValueTuple.setFieldPermutation(prevValuePermutation);
- this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
- this.upsertIndicatorInspector = upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx);
+ this.prevTuple.setFieldPermutation(prevTuplePermutation);
+ this.operationFieldIndex = operationFieldIndex;
+ this.operationInspector = operationInspectorFactory.createBinaryIntegerInspector(ctx);
this.numberOfFields = fieldPermutation.length;
- // a primary key index only has primary keys, and thus these two permutations are the same
- this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation, prevValuePermutation);
}
@Override
@@ -97,36 +90,24 @@
for (int i = 0; i < tupleCount; i++) {
try {
frameTuple.reset(accessor, i);
- boolean isUpsert =
- upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
- frameTuple.getFieldStart(upsertIndicatorFieldIndex),
- frameTuple.getFieldLength(upsertIndicatorFieldIndex));
- // if both previous value and new value are null, then we skip
+ int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+ frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
tuple.reset(accessor, i);
- prevValueTuple.reset(accessor, i);
+ prevTuple.reset(accessor, i);
- boolean newTupleHasNullOrMissing = hasNullOrMissing(tuple);
- boolean oldTupleHasNullOrMissing = hasNullOrMissing(prevValueTuple);
- if (newTupleHasNullOrMissing && oldTupleHasNullOrMissing) {
- // No op
- continue;
- }
- // At least, one is not null
- if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) {
- // For a secondary index, if the secondary key values do not change, we can skip upserting it.
- // However, for a primary key index, we cannot do this because it only contains primary keys
- // which are always the same
- continue;
- }
- // if all old fields are known values, then delete. skip deleting if any is null or missing
- if (!oldTupleHasNullOrMissing) {
- abstractModCallback.setOp(Operation.DELETE);
- lsmAccessor.forceDelete(prevValueTuple);
- }
- // if all new fields are known values, then insert. skip inserting if any is null or missing
- if (isUpsert && !newTupleHasNullOrMissing) {
+ if (operation == UPSERT_NEW) {
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
+ } else if (operation == UPSERT_EXISTING) {
+ if (!TupleUtils.equalTuples(tuple, prevTuple, numberOfFields)) {
+ abstractModCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(prevTuple);
+ abstractModCallback.setOp(Operation.INSERT);
+ lsmAccessor.forceInsert(tuple);
+ }
+ } else if (operation == DELETE_EXISTING) {
+ abstractModCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(prevTuple);
}
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
index d077987..bbf0af1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
@@ -20,7 +20,7 @@
import java.util.List;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -39,11 +39,11 @@
public LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(JobSpecification spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
- IModificationOperationCallbackFactory modCallbackFactory, int upsertIndicatorFieldIndex,
- IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
- List<AlgebricksPipeline> secondaryKeysPipeline, List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
- super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null, modCallbackFactory,
- upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory, null);
+ IModificationOperationCallbackFactory modCallbackFactory, int operationFieldIndex,
+ IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
+ List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
+ super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null, modCallbackFactory, operationFieldIndex,
+ operationInspectorFactory, null);
this.secondaryKeysPipeline = secondaryKeysPipeline;
this.prevSecondaryKeysPipeline = prevSecondaryKeysPipeline;
}
@@ -53,7 +53,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(ctx, partition, indexHelperFactory,
- modCallbackFactory, fieldPermutation, inputRecDesc, upsertIndicatorFieldIndex,
- upsertIndicatorInspectorFactory, secondaryKeysPipeline, prevSecondaryKeysPipeline);
+ modCallbackFactory, fieldPermutation, inputRecDesc, operationFieldIndex, operationInspectorFactory,
+ secondaryKeysPipeline, prevSecondaryKeysPipeline);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 303bece..3fe3e85 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -24,7 +24,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.meta.PipelineAssembler;
@@ -48,12 +48,11 @@
public LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
- int[] fieldPermutation, RecordDescriptor inputRecDesc, int upsertIndicatorFieldIndex,
- IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
- List<AlgebricksPipeline> secondaryKeysPipeline, List<AlgebricksPipeline> prevSecondaryKeysPipeline)
- throws HyracksDataException {
+ int[] fieldPermutation, RecordDescriptor inputRecDesc, int operationFieldIndex,
+ IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
+ List<AlgebricksPipeline> prevSecondaryKeysPipeline) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, modCallbackFactory, null, fieldPermutation, inputRecDesc,
- upsertIndicatorFieldIndex, upsertIndicatorInspectorFactory, null);
+ operationFieldIndex, operationInspectorFactory, null);
this.numberOfPrimaryKeyAndFilterFields = fieldPermutation.length;
this.startOfNewKeyPipelines = buildStartOfPipelines(secondaryKeysPipeline, inputRecDesc, false);
this.startOfPrevKeyPipelines = buildStartOfPipelines(prevSecondaryKeysPipeline, inputRecDesc, true);
@@ -110,9 +109,9 @@
// Insert all of our new keys, if the PIDX operation was also an UPSERT (and not just a DELETE).
frameTuple.reset(accessor, i);
- if (upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
- frameTuple.getFieldStart(upsertIndicatorFieldIndex),
- frameTuple.getFieldLength(upsertIndicatorFieldIndex))) {
+ int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
+ frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
+ if (operation == UPSERT_NEW || operation == UPSERT_EXISTING) {
writeTupleToPipelineStarts(buffer, i, startOfNewKeyPipelines);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 8540d0b..9bafe3e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -218,10 +218,10 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
- LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
- JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
+ List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr, LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
+ RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec,
+ List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 7b6ed26..f2ac41a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -45,7 +45,7 @@
* perform. In the case of bulk-loading, {@link #operation} will be INSERT and the {@link #bulkload} flag will be
* raised. {@link #additionalFilteringExpressions} and {@link #numberOfAdditionalNonFilteringFields} refers to the
* additionalFilteringExpressions, numberOfAdditionalNonFilteringFields found in the corresponding primary index
- * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link #upsertIndicatorExpr} also originates from
+ * {@link InsertDeleteUpsertOperator} (i.e. to specify LSM filters). {@link #operationExpr} also originates from
* {@link InsertDeleteUpsertOperator}, and is only set when the operation is of kind UPSERT.
* <p>
*
@@ -84,7 +84,7 @@
// used for upsert operations
private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
- private Mutable<ILogicalExpression> upsertIndicatorExpr;
+ private Mutable<ILogicalExpression> operationExpr;
private final int numberOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
@@ -129,8 +129,8 @@
}
}
}
- // Upsert indicator var <For upsert>
- if (upsertIndicatorExpr != null && visitor.transform(upsertIndicatorExpr)) {
+ // Operation indicator var <For upsert>
+ if (operationExpr != null && visitor.transform(operationExpr)) {
b = true;
}
// Old secondary <For upsert>
@@ -177,8 +177,8 @@
e.getValue().getUsedVariables(vars);
}
}
- if (getUpsertIndicatorExpr() != null) {
- getUpsertIndicatorExpr().getValue().getUsedVariables(vars);
+ if (getOperationExpr() != null) {
+ getOperationExpr().getValue().getUsedVariables(vars);
}
}
@@ -273,11 +273,11 @@
return numberOfAdditionalNonFilteringFields;
}
- public Mutable<ILogicalExpression> getUpsertIndicatorExpr() {
- return upsertIndicatorExpr;
+ public Mutable<ILogicalExpression> getOperationExpr() {
+ return operationExpr;
}
- public void setUpsertIndicatorExpr(Mutable<ILogicalExpression> upsertIndicatorExpr) {
- this.upsertIndicatorExpr = upsertIndicatorExpr;
+ public void setOperationExpr(Mutable<ILogicalExpression> operationExpr) {
+ this.operationExpr = operationExpr;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index ce2f801..5a54c7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -59,9 +59,9 @@
// previous additional fields (for UPSERT)
private List<LogicalVariable> prevAdditionalNonFilteringVars;
private List<Object> prevAdditionalNonFilteringTypes;
- // a boolean variable that indicates whether it's a delete operation (false) or upsert operation (true)
- private LogicalVariable upsertIndicatorVar;
- private Object upsertIndicatorVarType;
+ // int describing the upsert (e.g. upserting a new tuple or to an existing tuple or just deleting an existing one)
+ private LogicalVariable operationVar;
+ private Object operationVarType;
public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -88,7 +88,7 @@
public void recomputeSchema() throws AlgebricksException {
schema = new ArrayList<LogicalVariable>();
if (operation == Kind.UPSERT) {
- schema.add(upsertIndicatorVar);
+ schema.add(operationVar);
// The upsert case also produces the previous record
schema.add(prevRecordVar);
if (additionalNonFilteringExpressions != null) {
@@ -103,7 +103,7 @@
public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
if (operation == Kind.UPSERT) {
- producedVariables.add(upsertIndicatorVar);
+ producedVariables.add(operationVar);
producedVariables.add(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
producedVariables.addAll(prevAdditionalNonFilteringVars);
@@ -150,7 +150,7 @@
@Override
public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
if (operation == Kind.UPSERT) {
- target.addVariable(upsertIndicatorVar);
+ target.addVariable(operationVar);
target.addVariable(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
for (LogicalVariable var : prevAdditionalNonFilteringVars) {
@@ -175,7 +175,7 @@
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
if (operation == Kind.UPSERT) {
- env.setVarType(upsertIndicatorVar, upsertIndicatorVarType);
+ env.setVarType(operationVar, operationVarType);
env.setVarType(prevRecordVar, prevRecordType);
if (prevAdditionalNonFilteringVars != null) {
for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) {
@@ -229,20 +229,20 @@
this.prevRecordVar = prevRecordVar;
}
- public LogicalVariable getUpsertIndicatorVar() {
- return upsertIndicatorVar;
+ public LogicalVariable getOperationVar() {
+ return operationVar;
}
- public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) {
- this.upsertIndicatorVar = upsertIndicatorVar;
+ public void setOperationVar(LogicalVariable operationVar) {
+ this.operationVar = operationVar;
}
- public Object getUpsertIndicatorVarType() {
- return upsertIndicatorVarType;
+ public Object getOperationVarType() {
+ return operationVarType;
}
- public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) {
- this.upsertIndicatorVarType = upsertIndicatorVarType;
+ public void setOperationVarType(Object operationVarType) {
+ this.operationVarType = operationVarType;
}
public void setPrevRecordType(Object recordType) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index a9f9626..c4b4e47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -617,7 +617,7 @@
|| !Objects.equals(op.getPrevSecondaryKeyExprs(), insertOpArg.getPrevSecondaryKeyExprs())
|| !Objects.equals(op.getPrevAdditionalFilteringExpression(),
insertOpArg.getPrevAdditionalFilteringExpression())
- || !Objects.equals(op.getUpsertIndicatorExpr(), insertOpArg.getUpsertIndicatorExpr())
+ || !Objects.equals(op.getOperationExpr(), insertOpArg.getOperationExpr())
|| (op.getNumberOfAdditionalNonFilteringFields() != insertOpArg
.getNumberOfAdditionalNonFilteringFields())) {
return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 6e8b425..5aa63ae 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -421,8 +421,8 @@
Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
boolean producedVarFound = false;
if (op.getOperation() == InsertDeleteUpsertOperator.Kind.UPSERT) {
- if (op.getUpsertIndicatorVar() != null && op.getUpsertIndicatorVar().equals(pair.first)) {
- op.setUpsertIndicatorVar(pair.second);
+ if (op.getOperationVar() != null && op.getOperationVar().equals(pair.first)) {
+ op.setOperationVar(pair.second);
producedVarFound = true;
} else if (op.getBeforeOpRecordVar() != null && op.getBeforeOpRecordVar().equals(pair.first)) {
op.setPrevRecordVar(pair.second);
@@ -453,7 +453,7 @@
substUsedVariablesInExpr(op.getSecondaryKeyExpressions(), pair.first, pair.second);
substUsedVariablesInExpr(op.getFilterExpression(), pair.first, pair.second);
substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
- substUsedVariablesInExpr(op.getUpsertIndicatorExpr(), pair.first, pair.second);
+ substUsedVariablesInExpr(op.getOperationExpr(), pair.first, pair.second);
substUsedVariablesInExpr(op.getPrevSecondaryKeyExprs(), pair.first, pair.second);
substUsedVariablesInExpr(op.getPrevAdditionalFilteringExpression(), pair.first, pair.second);
if (!op.getNestedPlans().isEmpty()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 23fe3b2..4fb30b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -429,8 +429,8 @@
e.getValue().getUsedVariables(usedVariables);
}
}
- if (op.getUpsertIndicatorExpr() != null) {
- op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables);
+ if (op.getOperationExpr() != null) {
+ op.getOperationExpr().getValue().getUsedVariables(usedVariables);
}
visitNestedPlans(op);
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 2d9dabe..26581b1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -57,14 +57,14 @@
private final ILogicalExpression filterExpr;
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<LogicalVariable> additionalFilteringKeys;
- private final LogicalVariable upsertIndicatorVar;
+ private final LogicalVariable operationVar;
private final List<LogicalVariable> prevSecondaryKeys;
private final LogicalVariable prevAdditionalFilteringKey;
private final int numOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
- IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable upsertIndicatorVar,
+ IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
int numOfAdditionalNonFilteringFields) {
this.primaryKeys = primaryKeys;
@@ -76,7 +76,7 @@
}
this.dataSourceIndex = dataSourceIndex;
this.additionalFilteringKeys = additionalFilteringKeys;
- this.upsertIndicatorVar = upsertIndicatorVar;
+ this.operationVar = operationVar;
this.prevSecondaryKeys = prevSecondaryKeys;
this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields;
@@ -157,7 +157,7 @@
break;
case UPSERT:
runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, upsertIndicatorVar,
+ typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKey, inputDesc, context, spec, secondaryKeyPipelines);
break;
default:
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index b95971f..cd0d996 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -428,11 +428,11 @@
return new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex());
} else {
- LogicalVariable upsertIndicatorVar = null;
+ LogicalVariable operationVar = null;
List<LogicalVariable> prevSecondaryKeys = null;
LogicalVariable prevAdditionalFilteringKey = null;
if (opInsDel.getOperation() == Kind.UPSERT) {
- upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+ operationVar = getKey(opInsDel.getOperationExpr().getValue());
prevSecondaryKeys = new ArrayList<>();
getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
@@ -442,9 +442,8 @@
}
}
return new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
- opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), upsertIndicatorVar,
- prevSecondaryKeys, prevAdditionalFilteringKey,
- opInsDel.getNumberOfAdditionalNonFilteringFields());
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(), operationVar, prevSecondaryKeys,
+ prevAdditionalFilteringKey, opInsDel.getNumberOfAdditionalNonFilteringFields());
}
}