[NO-ISSUE][IDX] Using an implicit DISTINCT for array index maintenance.
- user mode changes: no
- storage format changes: no
- interface changes: no
Currently using the try-catch pattern to implement the implicit
DISTINCT for array index maintenance. This is to handle
duplicates in a record's array without relying on a blocking
operator (ORDER in this case, right before the DISTINCT).
Change-Id: I328bf4e80037a23b6d44139548436f6c4f232bba
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11723
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Glenn Galvizo <ggalvizo@uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 81c6c2a..e567974 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -449,7 +449,7 @@
context.computeAndSetTypeEnvironmentForOperator(unnestSourceOp);
UnnestBranchCreator unnestSIDXBranch = buildUnnestBranch(unnestSourceOp, index, newRecordVar,
newMetaVar, recType, metaType, dataset.hasMetaPart());
- unnestSIDXBranch.applyProjectDistinct();
+ unnestSIDXBranch.applyProjectOnly();
// If there exists a filter expression, add it to the top of our nested plan.
filterExpression = (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
@@ -477,7 +477,7 @@
UnnestBranchCreator unnestBeforeSIDXBranch = buildUnnestBranch(unnestBeforeSourceOp, index,
primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, recType,
metaType, dataset.hasMetaPart());
- unnestBeforeSIDXBranch.applyProjectDistinct();
+ unnestBeforeSIDXBranch.applyProjectOnly();
indexUpdate.getNestedPlans().add(unnestBeforeSIDXBranch.buildBranch());
}
} else if (index.getIndexType() == IndexType.ARRAY && isBulkload) {
@@ -1010,6 +1010,13 @@
return varRef;
}
+ public final void applyProjectOnly() throws AlgebricksException {
+ List<LogicalVariable> projectVars = new ArrayList<>(this.lastFieldVars);
+ ProjectOperator projectOperator = new ProjectOperator(projectVars);
+ projectOperator.setSourceLocation(sourceLoc);
+ this.currentTop = introduceNewOp(currentTop, projectOperator, true);
+ }
+
@SafeVarargs
public final void applyProjectDistinct(List<Mutable<ILogicalExpression>>... auxiliaryExpressions)
throws AlgebricksException {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
index 0b36774..5712991 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
@@ -31,13 +31,13 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
@@ -122,10 +122,8 @@
private class IndexTupleInsertDelete implements IFrameWriter {
private final RecordDescriptor inputRecordDescriptor;
private FrameTupleAccessor endOfPipelineTupleAccessor;
-
- // We are not writing the resulting tuple to a frame, we must store the result in an intermediate.
- private ArrayTupleBuilder arrayTupleBuilder;
- private ArrayTupleReference arrayTupleReference;
+ private FrameTupleReference endOfPipelineTupleReference;
+ private ConcatenatingTupleReference endTupleReference;
private IndexTupleInsertDelete(RecordDescriptor recordDescriptor) {
this.inputRecordDescriptor = recordDescriptor;
@@ -133,11 +131,9 @@
@Override
public void open() throws HyracksDataException {
- int numSecondaryKeys = inputRecordDescriptor.getFieldCount();
-
+ endTupleReference = new ConcatenatingTupleReference(2);
endOfPipelineTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
- arrayTupleBuilder = new ArrayTupleBuilder(numberOfPrimaryKeyAndFilterFields + numSecondaryKeys);
- arrayTupleReference = new ArrayTupleReference();
+ endOfPipelineTupleReference = new FrameTupleReference();
}
@Override
@@ -147,25 +143,33 @@
endOfPipelineTupleAccessor.reset(buffer);
int nTuple = endOfPipelineTupleAccessor.getTupleCount();
for (int t = 0; t < nTuple; t++) {
+ endOfPipelineTupleReference.reset(endOfPipelineTupleAccessor, t);
+ endTupleReference.reset();
- // First, add the secondary keys.
- arrayTupleBuilder.reset();
- int nFields = endOfPipelineTupleAccessor.getFieldCount();
- for (int f = 0; f < nFields; f++) {
- arrayTupleBuilder.addField(endOfPipelineTupleAccessor, t, f);
- }
+ // Add the secondary keys.
+ endTupleReference.addTuple(endOfPipelineTupleReference);
- // Next, add the primary keys and filter fields.
- for (int f = 0; f < numberOfPrimaryKeyAndFilterFields; f++) {
- arrayTupleBuilder.addField(tuple.getFieldData(f), tuple.getFieldStart(f), tuple.getFieldLength(f));
- }
+ // Add the primary keys and filter fields.
+ endTupleReference.addTuple(tuple);
- // Finally, pass the tuple to our accessor. There are only two operations: insert or delete.
- arrayTupleReference.reset(arrayTupleBuilder.getFieldEndOffsets(), arrayTupleBuilder.getByteArray());
+ // Pass the tuple to our accessor. There are only two operations: insert or delete.
if (op.equals(IndexOperation.INSERT)) {
- workingLSMAccessor.forceInsert(arrayTupleReference);
+ try {
+ workingLSMAccessor.forceInsert(endTupleReference);
+ } catch (HyracksDataException e) {
+ if (!e.matches(org.apache.hyracks.api.exceptions.ErrorCode.DUPLICATE_KEY)) {
+ throw e;
+ }
+ }
+
} else {
- workingLSMAccessor.forceDelete(arrayTupleReference);
+ try {
+ workingLSMAccessor.forceDelete(endTupleReference);
+ } catch (HyracksDataException e) {
+ if (!e.matches(org.apache.hyracks.api.exceptions.ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY)) {
+ throw e;
+ }
+ }
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index f1af496..303bece 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -33,13 +33,12 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
public class LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSecondaryUpsertOperatorNodePushable {
@@ -154,10 +153,7 @@
private FrameTupleAccessor endOfPipelineTupleAccessor;
private FrameTupleReference endOfPipelineTupleReference;
-
- // We are not writing the resulting tuple to a frame, we must store the result in an intermediate.
- private ArrayTupleBuilder arrayTupleBuilder;
- private ArrayTupleReference arrayTupleReference;
+ private ConcatenatingTupleReference endTupleReference;
private IndexTupleUnconditionalOperation(RecordDescriptor recordDescriptor, boolean isInsert) {
this.inputRecordDescriptor = recordDescriptor;
@@ -166,11 +162,9 @@
@Override
public void open() throws HyracksDataException {
- int numSecondaryKeys = inputRecordDescriptor.getFieldCount();
+ endTupleReference = new ConcatenatingTupleReference(2);
endOfPipelineTupleAccessor = new FrameTupleAccessor(inputRecordDescriptor);
endOfPipelineTupleReference = new FrameTupleReference();
- arrayTupleBuilder = new ArrayTupleBuilder(numberOfPrimaryKeyAndFilterFields + numSecondaryKeys);
- arrayTupleReference = new ArrayTupleReference();
}
@Override
@@ -180,33 +174,33 @@
endOfPipelineTupleAccessor.reset(buffer);
int nTuple = endOfPipelineTupleAccessor.getTupleCount();
for (int t = 0; t < nTuple; t++) {
-
endOfPipelineTupleReference.reset(endOfPipelineTupleAccessor, t);
+
+ // Do not perform operations w/ null or missing values (same behavior as atomic upserts).
if (hasNullOrMissing(endOfPipelineTupleReference)) {
- // Do not perform operations w/ null or missing values (same behavior as atomic upserts).
continue;
}
- // First, add the secondary keys.
- arrayTupleBuilder.reset();
- int nFields = endOfPipelineTupleAccessor.getFieldCount();
- for (int f = 0; f < nFields; f++) {
- arrayTupleBuilder.addField(endOfPipelineTupleAccessor, t, f);
- }
+ // Add the secondary keys.
+ endTupleReference.reset();
+ endTupleReference.addTuple(endOfPipelineTupleReference);
- // Next, add the primary keys and filter fields.
- for (int f = 0; f < numberOfPrimaryKeyAndFilterFields; f++) {
- arrayTupleBuilder.addField(tuple.getFieldData(f), tuple.getFieldStart(f), tuple.getFieldLength(f));
- }
+ // Add the primary keys and filter fields.
+ endTupleReference.addTuple(tuple);
// Finally, pass the tuple to our accessor. There are only two operations: insert or delete.
- arrayTupleReference.reset(arrayTupleBuilder.getFieldEndOffsets(), arrayTupleBuilder.getByteArray());
if (this.isInsert) {
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
- workingLSMAccessor.forceInsert(arrayTupleReference);
+ try {
+ workingLSMAccessor.forceInsert(endTupleReference);
+ } catch (HyracksDataException e) {
+ if (!e.matches(org.apache.hyracks.api.exceptions.ErrorCode.DUPLICATE_KEY)) {
+ throw e;
+ }
+ }
} else {
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
- workingLSMAccessor.forceDelete(arrayTupleReference);
+ workingLSMAccessor.forceDelete(endTupleReference);
}
}
}