[ASTERIXDB-2429] Fix the upsert of primary key index
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Previously the primary key index is not properly maintained during
upsert. Since there is no secondary key in the primary key index, the
old value would always point to the primary key, which is always equal
to the new value. As a result, the primary key index is nevered
maintainined during upsert.
- This patch fixes this bug with two changes:
First, if there is a primary key index, we would perform upsert anyway
no matter whether old value == new value
Second, use a boolean variable to indicate whether the operation
is upsert or delete since for the primary key index, old value cannot
provide such information.
Change-Id: I925bd42ba67f70e94f5f5bc2d24151c8e2e20baf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2825
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index ae74832..e123715 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -496,6 +496,12 @@
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
}
}
+
+ if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
+ indexUpdate.setUpsertIndicatorExpr(new MutableObject<>(
+ new VariableReferenceExpression(primaryIndexModificationOp.getUpsertIndicatorVar())));
+ }
+
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) {
currentTop = indexUpdate;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index aa4fb75..2c1e0f7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -92,6 +92,7 @@
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.FunctionInfo;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -522,6 +523,8 @@
// A change feed, we don't need the assign to access PKs
upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList,
InsertDeleteUpsertOperator.Kind.UPSERT, false);
+ upsertOp.setUpsertIndicatorVar(context.newVar());
+ upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
// Create and add a new variable used for representing the original record
upsertOp.setPrevRecordVar(context.newVar());
upsertOp.setPrevRecordType(targetDatasource.getItemType());
@@ -556,6 +559,8 @@
upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
upsertOp.getInputs().add(new MutableObject<>(assign));
upsertOp.setSourceLocation(sourceLoc);
+ upsertOp.setUpsertIndicatorVar(context.newVar());
+ upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
// Create and add a new variable used for representing the original record
ARecordType recordType = (ARecordType) targetDatasource.getItemType();
upsertOp.setPrevRecordVar(context.newVar());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index b74d739..483987c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -383,8 +383,8 @@
// Gets the primary key permutation for upserts.
private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
- // prev record first
- int f = 1;
+ // upsertIndicatorVar + prev record
+ int f = 2;
// add the previous meta second
if (dataset.hasMetaPart()) {
f++;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
new file mode 100644
index 0000000..332aa0d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.ddl.sqlpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Create a change feed with meta-data, create a primary key index and then ingest data (with deletes)
+ * Expected Res : Success
+ * Date : 18th Jun 2018
+ */
+
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+`key`:string,
+vbucket:int32,
+seq:int64,
+cas:int64,
+expiration:int32,
+flags:int32,
+revSeq:int64,
+lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType)primary key meta().`key`;
+create primary index primary_idx on KVStore;
+
+create feed KVChangeStream with {
+ "adapter-name" : "adapter",
+ "type-name" : "DocumentType",
+ "meta-type-name" : "KVMetaType",
+ "reader" : "org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory",
+ "parser" : "record-with-metadata",
+ "format" : "dcp",
+ "record-format" : "json",
+ "change-feed" : "true",
+ "key-indexes" : "0",
+ "key-indicators" : "1",
+ "num-of-records" : "1000",
+ "delete-cycle" : "5"
+};
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
new file mode 100644
index 0000000..03a83ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.2.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use KeyVerse;
+
+set `wait-for-completion-feed` "true";
+connect feed KVChangeStream to dataset KVStore;
+
+start feed KVChangeStream;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
new file mode 100644
index 0000000..6e29992
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use KeyVerse;
+
+select count(*)
+from KVStore x;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
new file mode 100644
index 0000000..89469bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.4.ddl.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
new file mode 100644
index 0000000..3f08b2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/upsert/primary-secondary-btree/primary-secondary-btree.4.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Upsert into a dataset which has a b-tree secondary index
+ * Expected Res : Success
+ * Date : Sep 15th 2015
+ */
+
+use test;
+
+select count(*)
+from UpsertTo x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
new file mode 100644
index 0000000..d0d0910
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-index/change-feed-with-meta-pk-index.1.adm
@@ -0,0 +1 @@
+{ "$1": 804 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
new file mode 100644
index 0000000..71c9709
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/upsert/primary-secondary-btree/primary-secondary-btree.2.adm
@@ -0,0 +1 @@
+{ "$1": 9 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9ca1fbd..dedabbe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -8865,6 +8865,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="feeds">
+ <compilation-unit name="change-feed-with-meta-pk-index">
+ <output-dir compare="Text">change-feed-with-meta-pk-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="feeds">
<compilation-unit name="change-feed-with-meta-with-mixed-index">
<output-dir compare="Text">change-feed-with-meta-with-mixed-index</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 695dcb8..b5bcece 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -57,6 +57,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedConstants;
import org.apache.asterix.formats.base.IDataFormat;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -668,7 +669,7 @@
boolean bulkload) throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
- context, spec, bulkload, null, null);
+ context, spec, bulkload, null, null, null);
}
@Override
@@ -680,7 +681,7 @@
throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc,
- context, spec, false, null, null);
+ context, spec, false, null, null, null);
}
@Override
@@ -688,12 +689,12 @@
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
- ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema,
inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc,
- context, spec, false, prevSecondaryKeys, prevAdditionalFilteringKey);
+ context, spec, false, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey);
}
@Override
@@ -1042,8 +1043,8 @@
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr,
RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload,
- List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey)
- throws AlgebricksException {
+ LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
@@ -1062,18 +1063,19 @@
case BTREE:
return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case RTREE:
return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp,
- secondaryIndex.getIndexType(), bulkload, prevSecondaryKeys, prevAdditionalFilteringKeys);
+ secondaryIndex.getIndexType(), bulkload, upsertIndicatorVar, prevSecondaryKeys,
+ prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
indexOp.name() + "Insert, upsert, and delete not implemented for index type: "
@@ -1085,8 +1087,9 @@
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
@@ -1153,8 +1156,10 @@
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
+ int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
- filterFactory, modificationCallbackFactory, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
+ BinaryBooleanInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
filterFactory, false, modificationCallbackFactory);
@@ -1169,8 +1174,9 @@
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOperation indexOp, boolean bulkload, List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE
@@ -1250,8 +1256,10 @@
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataflowHelperFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
+ int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
- indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, prevFieldPermutation);
+ indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
+ BinaryBooleanInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory);
@@ -1264,8 +1272,8 @@
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload,
- List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
- throws AlgebricksException {
+ LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+ List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
// Check the index is length-partitioned or not.
boolean isPartitioned;
if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -1359,8 +1367,10 @@
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId());
} else if (indexOp == IndexOperation.UPSERT) {
+ int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
- filterFactory, modificationCallbackFactory, prevFieldPermutation);
+ filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex,
+ BinaryBooleanInspector.FACTORY, prevFieldPermutation);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, false, modificationCallbackFactory);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index bbdfadf..28c612f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -407,15 +407,20 @@
IIndexDataflowHelperFactory idfh =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
LSMPrimaryUpsertOperatorDescriptor op;
- ITypeTraits[] outputTypeTraits =
- new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + 1
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
IDataFormat dataFormat = metadataProvider.getDataFormat();
- // add the previous record first
int f = 0;
+ // add the upsert indicator var
+ outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.ABOOLEAN);
+ f++;
+ // add the previous record
outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
+ outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType);
f++;
// add the previous meta second
if (dataset.hasMetaPart()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3df1b13..ba8074fe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
+import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -158,11 +159,13 @@
if (cursor.hasNext()) {
cursor.next();
prevTuple = cursor.getTuple();
+ appendUpsertIndicator(!isDelete);
appendFilterToPrevTuple();
appendPrevRecord();
appendPreviousMeta();
appendFilterToOutput();
} else {
+ appendUpsertIndicator(!isDelete);
appendPreviousTupleAsMissing();
}
} finally {
@@ -170,6 +173,7 @@
}
} else {
searchCallback.before(key); // lock
+ appendUpsertIndicator(!isDelete);
appendPreviousTupleAsMissing();
}
if (isDelete && prevTuple != null) {
@@ -330,6 +334,11 @@
}
}
+ private void appendUpsertIndicator(boolean isUpsert) throws IOException {
+ recordDesc.getFields()[0].serialize(isUpsert ? ABoolean.TRUE : ABoolean.FALSE, dos);
+ tb.addFieldEndOffset();
+ }
+
private void appendPrevRecord() throws IOException {
dos.write(prevTuple.getFieldData(numOfPrimaryKeys), prevTuple.getFieldStart(numOfPrimaryKeys),
prevTuple.getFieldLength(numOfPrimaryKeys));
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index 958288a..df658b6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.operators;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -34,14 +35,19 @@
private static final long serialVersionUID = 1L;
private final int[] prevValuePermutation;
+ private final int upsertIndiatorFieldIndex;
+ private final IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory;
public LSMSecondaryUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+ int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
int[] prevValuePermutation) {
super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, tupleFilterFactory, false,
modificationOpCallbackFactory);
this.prevValuePermutation = prevValuePermutation;
+ this.upsertIndiatorFieldIndex = upsertIndicatorFieldIndex;
+ this.upsertIndicatorInspectorFactory = upsertIndicatorInspectorFactory;
}
@Override
@@ -49,6 +55,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, modCallbackFactory,
- tupleFilterFactory, fieldPermutation, intputRecDesc, prevValuePermutation);
+ tupleFilterFactory, fieldPermutation, intputRecDesc, upsertIndiatorFieldIndex,
+ upsertIndicatorInspectorFactory, prevValuePermutation);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index b928131..2dc7f5e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -19,15 +19,19 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
+import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -56,22 +60,31 @@
public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
private final PermutingFrameTupleReference prevValueTuple = new PermutingFrameTupleReference();
+ private final int upsertIndicatorFieldIndex;
+ private final IBinaryBooleanInspector upsertIndicatorInspector;
private final int numberOfFields;
private AbstractIndexModificationOperationCallback abstractModCallback;
+ private final boolean isPrimaryKeyIndex;
public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
ITupleFilterFactory tupleFilterFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
+ int upsertIndicatorFieldIndex, IBinaryBooleanInspectorFactory upsertIndicatorInspectorFactory,
int[] prevValuePermutation) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, tupleFilterFactory);
this.prevValueTuple.setFieldPermutation(prevValuePermutation);
+ this.upsertIndicatorFieldIndex = upsertIndicatorFieldIndex;
+ this.upsertIndicatorInspector = upsertIndicatorInspectorFactory.createBinaryBooleanInspector(ctx);
this.numberOfFields = prevValuePermutation.length;
+ // a primary key index only has primary keys, and thus these two permutations are the same
+ this.isPrimaryKeyIndex = Arrays.equals(fieldPermutation, prevValuePermutation);
}
@Override
public void open() throws HyracksDataException {
super.open();
+ frameTuple = new FrameTupleReference();
abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
}
@@ -82,9 +95,15 @@
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
try {
+ frameTuple.reset(accessor, i);
+ boolean isUpsert =
+ upsertIndicatorInspector.getBooleanValue(frameTuple.getFieldData(upsertIndicatorFieldIndex),
+ frameTuple.getFieldStart(upsertIndicatorFieldIndex),
+ frameTuple.getFieldLength(upsertIndicatorFieldIndex));
// if both previous value and new value are null, then we skip
tuple.reset(accessor, i);
prevValueTuple.reset(accessor, i);
+
boolean isNewValueMissing = isMissing(tuple, 0);
boolean isOldValueMissing = isMissing(prevValueTuple, 0);
if (isNewValueMissing && isOldValueMissing) {
@@ -92,8 +111,10 @@
continue;
}
// At least, one is not null
- // If they are equal, then we skip
- if (TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) {
+ if (!isPrimaryKeyIndex && TupleUtils.equalTuples(tuple, prevValueTuple, numberOfFields)) {
+ // For a secondary index, if the secondary key values do not change, we can skip upserting it.
+ // However, for a primary key index, we cannot do this because it only contains primary keys
+ // which are always the same
continue;
}
if (!isOldValueMissing) {
@@ -101,7 +122,7 @@
abstractModCallback.setOp(Operation.DELETE);
lsmAccessor.forceDelete(prevValueTuple);
}
- if (!isNewValueMissing) {
+ if (isUpsert && !isNewValueMissing) {
// we need to insert the new value
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index efa9c1c..3d004a2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -205,8 +205,9 @@
IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
- List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
- RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException;
+ LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException;
public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 31a1294..154fb13 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -49,6 +49,7 @@
// used for upsert operations
private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
+ private Mutable<ILogicalExpression> upsertIndicatorExpr;
private final int numberOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
@@ -93,6 +94,12 @@
}
}
}
+
+ // Upsert indicator var <For upsert>
+ if (upsertIndicatorExpr != null && visitor.transform(upsertIndicatorExpr)) {
+ b = true;
+ }
+
// Old secondary <For upsert>
if (prevSecondaryKeyExprs != null) {
for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
@@ -189,4 +196,12 @@
public int getNumberOfAdditionalNonFilteringFields() {
return numberOfAdditionalNonFilteringFields;
}
+
+ public Mutable<ILogicalExpression> getUpsertIndicatorExpr() {
+ return upsertIndicatorExpr;
+ }
+
+ public void setUpsertIndicatorExpr(Mutable<ILogicalExpression> upsertIndicatorExpr) {
+ this.upsertIndicatorExpr = upsertIndicatorExpr;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index 9838c12..ae90462 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -59,6 +59,9 @@
// previous additional fields (for UPSERT)
private List<LogicalVariable> prevAdditionalNonFilteringVars;
private List<Object> prevAdditionalNonFilteringTypes;
+ // a boolean variable that indicates whether it's a delete operation (false) or upsert operation (true)
+ private LogicalVariable upsertIndicatorVar;
+ private Object upsertIndicatorVarType;
public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
List<Mutable<ILogicalExpression>> primaryKeyExprs,
@@ -85,6 +88,7 @@
public void recomputeSchema() throws AlgebricksException {
schema = new ArrayList<LogicalVariable>();
if (operation == Kind.UPSERT) {
+ schema.add(upsertIndicatorVar);
// The upsert case also produces the previous record
schema.add(prevRecordVar);
if (additionalNonFilteringExpressions != null) {
@@ -98,6 +102,9 @@
}
public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
+ if (upsertIndicatorVar != null) {
+ producedVariables.add(upsertIndicatorVar);
+ }
if (prevRecordVar != null) {
producedVariables.add(prevRecordVar);
}
@@ -147,6 +154,7 @@
public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
throws AlgebricksException {
if (operation == Kind.UPSERT) {
+ target.addVariable(upsertIndicatorVar);
target.addVariable(prevRecordVar);
if (prevAdditionalNonFilteringVars != null) {
for (LogicalVariable var : prevAdditionalNonFilteringVars) {
@@ -171,6 +179,7 @@
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
if (operation == Kind.UPSERT) {
+ env.setVarType(upsertIndicatorVar, upsertIndicatorVarType);
env.setVarType(prevRecordVar, prevRecordType);
if (prevAdditionalNonFilteringVars != null) {
for (int i = 0; i < prevAdditionalNonFilteringVars.size(); i++) {
@@ -224,6 +233,22 @@
this.prevRecordVar = prevRecordVar;
}
+ public LogicalVariable getUpsertIndicatorVar() {
+ return upsertIndicatorVar;
+ }
+
+ public void setUpsertIndicatorVar(LogicalVariable upsertIndicatorVar) {
+ this.upsertIndicatorVar = upsertIndicatorVar;
+ }
+
+ public Object getUpsertIndicatorVarType() {
+ return upsertIndicatorVarType;
+ }
+
+ public void setUpsertIndicatorVarType(Object upsertIndicatorVarType) {
+ this.upsertIndicatorVarType = upsertIndicatorVarType;
+ }
+
public void setPrevRecordType(Object recordType) {
prevRecordType = recordType;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d57a998..e66809e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -421,6 +421,9 @@
e.getValue().getUsedVariables(usedVariables);
}
}
+ if (op.getUpsertIndicatorExpr() != null) {
+ op.getUpsertIndicatorExpr().getValue().getUsedVariables(usedVariables);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 92fa86d..228ca52 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -54,14 +54,16 @@
private final ILogicalExpression filterExpr;
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<LogicalVariable> additionalFilteringKeys;
+ private final LogicalVariable upsertIndicatorVar;
private final List<LogicalVariable> prevSecondaryKeys;
private final LogicalVariable prevAdditionalFilteringKey;
private final int numOfAdditionalNonFilteringFields;
public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
- IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKey, int numOfAdditionalNonFilteringFields) {
+ IDataSourceIndex<?, ?> dataSourceIndex, LogicalVariable upsertIndicatorVar,
+ List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
+ int numOfAdditionalNonFilteringFields) {
this.primaryKeys = primaryKeys;
this.secondaryKeys = secondaryKeys;
if (filterExpr != null) {
@@ -71,6 +73,7 @@
}
this.dataSourceIndex = dataSourceIndex;
this.additionalFilteringKeys = additionalFilteringKeys;
+ this.upsertIndicatorVar = upsertIndicatorVar;
this.prevSecondaryKeys = prevSecondaryKeys;
this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
this.numOfAdditionalNonFilteringFields = numOfAdditionalNonFilteringFields;
@@ -132,8 +135,8 @@
break;
case UPSERT:
runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
- typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
- prevAdditionalFilteringKey, inputDesc, context, spec);
+ typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, upsertIndicatorVar,
+ prevSecondaryKeys, prevAdditionalFilteringKey, inputDesc, context, spec);
break;
default:
throw new AlgebricksException("Unsupported Operation " + operation);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index d277043..4869761 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -356,9 +356,11 @@
new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
} else {
+ LogicalVariable upsertIndicatorVar = null;
List<LogicalVariable> prevSecondaryKeys = null;
LogicalVariable prevAdditionalFilteringKey = null;
if (opInsDel.getOperation() == Kind.UPSERT) {
+ upsertIndicatorVar = getKey(opInsDel.getUpsertIndicatorExpr().getValue());
prevSecondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
@@ -369,7 +371,7 @@
}
op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
- prevSecondaryKeys, prevAdditionalFilteringKey,
+ upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey,
opInsDel.getNumberOfAdditionalNonFilteringFields()));
}
break;
@@ -407,14 +409,17 @@
private static void getKeys(List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
- ILogicalExpression e = kExpr.getValue();
- if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException();
- }
- keys.add(((VariableReferenceExpression) e).getVariableReference());
+ keys.add(getKey(kExpr.getValue()));
}
}
+ private static LogicalVariable getKey(ILogicalExpression keyExpression) {
+ if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException();
+ }
+ return ((VariableReferenceExpression) keyExpression).getVariableReference();
+ }
+
private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
List<Mutable<ILogicalExpression>> keyExpressions, List<LogicalVariable> keys) {
LogicalVariable payload;