Fix Upsert to Never Enforce the First Operation
Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1336
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 96f9e76..536366f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -24,12 +24,14 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -80,6 +82,7 @@
private final boolean hasMeta;
private final int filterFieldIndex;
private final int metaFieldIndex;
+ private LockThenSearchOperationCallback searchCallback;
public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
@@ -140,8 +143,9 @@
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
index, ctx, this);
- indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
+ searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
+ .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this);
+ indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
@@ -167,6 +171,12 @@
}
if (recordWasInserted || recordWasDeleted) {
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ } else {
+ try {
+ searchCallback.release();
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
}
}
@@ -185,6 +195,7 @@
accessor.reset(buffer);
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
+ boolean firstModification = true;
int i = 0;
try {
while (i < tupleCount) {
@@ -217,8 +228,9 @@
tb.addFieldEndOffset();
}
modCallback.setOp(Operation.DELETE);
- if (i == 0) {
+ if (firstModification) {
lsmAccessor.delete(prevTuple);
+ firstModification = false;
} else {
lsmAccessor.forceDelete(prevTuple);
}
@@ -236,8 +248,9 @@
}
if (!isNull(tuple, numOfPrimaryKeys)) {
modCallback.setOp(Operation.INSERT);
- if ((prevTuple == null) && (i == 0)) {
+ if (firstModification) {
lsmAccessor.insert(tuple);
+ firstModification = false;
} else {
lsmAccessor.forceInsert(tuple);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index ef3b218..288e7a5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -42,6 +42,7 @@
private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
private final ILogManager logManager;
private final ILogRecord logRecord;
+ private int pkHash;
public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem,
ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
@@ -75,7 +76,7 @@
@Override
public void before(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+ pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
if (operatorNodePushable != null) {
@@ -122,4 +123,8 @@
private void logWait() throws ACIDException {
logManager.log(logRecord);
}
+
+ public void release() throws ACIDException {
+ lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx);
+ }
}