[NO ISSUE][TX] Partition level atomicity implementation
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
With this change, inserts/upserts/deletes on datasets
created without any type will have partition level
atomicity. To acheieve this we have made the following
changes:
- Only one insert/upsert/delete at a time on the dataset.
- Queries do not read from memory components and directly
go to valid disk components.
- No locks taken by either writes or reads
- No logs written.
- The disk components generated by the operation are marked
as valid and checkpoint is updated on operation completion
and not on flush.
Change-Id: I8ed0fac37e026c909c986e6d844c3fae3833911e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17559
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
index 077b657..e754c77 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.context.DatasetInfoProvider;
import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.library.LibraryDescriptor;
@@ -190,6 +191,8 @@
// ILSMOperationTrackerFactory
registeredClasses.put("NoOpIOOperationCallbackFactory", NoOpIOOperationCallbackFactory.class);
registeredClasses.put("LSMBTreeIOOperationCallbackFactory", LSMIndexIOOperationCallbackFactory.class);
+ registeredClasses.put("AtomicLSMBTreeIOOperationCallbackFactory",
+ AtomicLSMIndexIOOperationCallbackFactory.class);
registeredClasses.put("LSMIndexPageWriteCallbackFactory", LSMIndexPageWriteCallbackFactory.class);
registeredClasses.put("NoOpPageWriteCallbackFactory", NoOpPageWriteCallbackFactory.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index 250c25f..8f482ee 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,8 +35,8 @@
private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
- ILSMComponentIdGenerator idGenerator) {
- super(datasetID, partition, logManager, dsInfo, idGenerator);
+ ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+ super(datasetID, partition, logManager, dsInfo, idGenerator, indexCheckpointManagerProvider);
}
public void addCallback(ITestOpCallback<Void> callback) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 38fdf56..8cf708b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -60,7 +60,8 @@
Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
- dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
+ dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()),
+ appCtx.getIndexCheckpointManagerProvider());
replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..f9c410b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -339,7 +339,7 @@
ILSMComponentIdGenerator idGenerator =
new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
- logManager, dataset.getDatasetInfo(), idGenerator);
+ logManager, dataset.getDatasetInfo(), idGenerator, indexCheckpointManagerProvider);
dataset.setPrimaryIndexOperationTracker(partition, opTracker);
dataset.setIdGenerator(partition, idGenerator);
}
@@ -424,7 +424,12 @@
return;
}
// ensure all in-flight flushes gets scheduled
- logManager.log(waitLog);
+ final boolean requiresWaitLog =
+ dsInfo.getIndexes().values().stream().noneMatch(indexInfo -> indexInfo.getIndex().isAtomic());
+ if (requiresWaitLog) {
+ logManager.log(waitLog);
+ }
+
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
if (!partitions.test(primaryOpTracker.getPartition())) {
continue;
@@ -439,7 +444,9 @@
primaryOpTracker.flushIfNeeded();
}
// ensure requested flushes were scheduled
- logManager.log(waitLog);
+ if (requiresWaitLog) {
+ logManager.log(waitLog);
+ }
if (!asyncFlush) {
List<FlushOperation> flushes = new ArrayList<>();
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2704e64..d9456d2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -28,14 +28,18 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -50,6 +54,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -70,14 +75,17 @@
private boolean flushLogCreated = false;
private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
private long lastFlushTime = System.nanoTime();
+ private final Map<String, FlushOperation> lastFlushOperation = new HashMap<>();
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
- ILSMComponentIdGenerator idGenerator) {
+ ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
super(datasetID, dsInfo);
this.partition = partition;
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
}
@Override
@@ -101,7 +109,7 @@
}
public synchronized void flushIfNeeded() throws HyracksDataException {
- if (canSafelyFlush()) {
+ if (canSafelyFlush() && !isFlushLogCreated()) {
flushIfRequested();
}
}
@@ -163,9 +171,16 @@
synchronized (opTracker) {
ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent();
if (memComponent.getWriterCount() > 0) {
- throw new IllegalStateException(
- "Can't request a flush on a component with writers inside: Index:" + lsmIndex
- + " Component:" + memComponent);
+ if (lsmIndex.isAtomic()) {
+ LOGGER.debug(
+ "Can't request a flush on a component with writers inside: Index: {} Component: {}",
+ lsmIndex, memComponent);
+ return;
+ } else {
+ throw new IllegalStateException(
+ "Can't request a flush on a component with writers inside: Index:" + lsmIndex
+ + " Component:" + memComponent);
+ }
}
if (memComponent.getState() == ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
memComponent.setUnwritable();
@@ -180,7 +195,7 @@
+ " and partition " + partition + " and is modified but its component id is null");
}
LogRecord logRecord = new LogRecord();
- if (dsInfo.isDurable()) {
+ if (dsInfo.isDurable() && !primaryLsmIndex.isAtomic()) {
/*
* Generate a FLUSH log.
* Flush will be triggered when the log is written to disk by LogFlusher.
@@ -194,7 +209,9 @@
}
flushLogCreated = true;
} else {
- //trigger flush for temporary indexes without generating a FLUSH log.
+ // trigger flush for temporary indexes and indexes on datasets with atomic statements enabled without
+ // generating a FLUSH log.
+ flushLogCreated = true;
triggerScheduleFlush(logRecord);
}
}
@@ -221,6 +238,16 @@
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+ if (lsmIndex.isPrimaryIndex()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ LOGGER.debug("Primary index on dataset {} and partition {} is empty... skipping flush",
+ dsInfo.getDatasetID(), partition);
+ return;
+ }
+ break;
+ }
+ }
synchronized (scheduledFlushes) {
for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -228,6 +255,9 @@
ILSMIOOperation flush = accessor.scheduleFlush();
lastFlushTime = System.nanoTime();
scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
+ if (lsmIndex.isAtomic()) {
+ lastFlushOperation.put(lsmIndex.getIndexIdentifier(), (FlushOperation) flush);
+ }
flush.addCompleteListener(this);
}
}
@@ -236,6 +266,39 @@
}
}
+ public void commit() throws HyracksDataException {
+ LogRecord logRecord = new LogRecord();
+ triggerScheduleFlush(logRecord);
+ List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+ LSMIndexUtil.waitFor(flushes);
+
+ Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.commit();
+ }
+ for (FlushOperation flush : lastFlushOperation.values()) {
+ FileReference target = flush.getTarget();
+ Map<String, Object> map = flush.getParameters();
+ final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+ final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+ final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+ indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
+ }
+ lastFlushOperation.clear();
+
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
+ }
+ }
+
+ public void abort() throws HyracksDataException {
+ Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.abort();
+ }
+ lastFlushOperation.clear();
+ }
+
@Override
public void completed(ILSMIOOperation operation) {
synchronized (scheduledFlushes) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index f9934f2..421fb7b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -48,6 +49,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.LocalResource;
@@ -212,6 +214,11 @@
failure = ExceptionUtils.suppress(failure, th);
}
}
+ if (failure == null && !failed) {
+ commitAtomicInsertDelete();
+ } else {
+ abortAtomicInsertDelete();
+ }
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -219,6 +226,7 @@
@Override
public void fail() throws HyracksDataException {
+ this.failed = true;
if (writerOpen) {
writer.fail();
}
@@ -227,4 +235,28 @@
public boolean isPrimary() {
return isPrimary;
}
+
+ private void commitAtomicInsertDelete() throws HyracksDataException {
+ if (isPrimary) {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+ }
+
+ private void abortAtomicInsertDelete() throws HyracksDataException {
+ if (isPrimary) {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
new file mode 100644
index 0000000..111c6e7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {
+
+ public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+ super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+ }
+
+ @Override
+ public void afterFinalize(ILSMIOOperation operation) throws HyracksDataException {
+ if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
+ return;
+ }
+ if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+ && operation.getAccessor().getOpContext().getOperation() == IndexOperation.DELETE_COMPONENTS) {
+ deleteComponentsFromCheckpoint(operation);
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ addComponentToCheckpoint(operation);
+ } else if (isMerge(operation)) {
+ IoUtil.delete(getOperationMaskFilePath(operation));
+ }
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000..3829c54
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.api.IDatasetInfoProvider;
+import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class AtomicLSMIndexIOOperationCallbackFactory extends LSMIndexIOOperationCallbackFactory {
+
+ private static final long serialVersionUID = -2617830712731546932L;
+
+ public AtomicLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory,
+ IDatasetInfoProvider datasetInfoProvider) {
+ super(idGeneratorFactory, datasetInfoProvider);
+ }
+
+ protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return ((INcApplicationContext) ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+ }
+
+ @Override
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
+ return new AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+ getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+ }
+
+ @SuppressWarnings("squid:S1172") // unused parameter
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+ throws HyracksDataException {
+ final ILSMComponentIdGeneratorFactory idGeneratorFactory =
+ (ILSMComponentIdGeneratorFactory) registry.deserialize(json.get("idGeneratorFactory"));
+ final IDatasetInfoProvider datasetInfoProvider =
+ (IDatasetInfoProvider) registry.deserialize(json.get("datasetInfoProvider"));
+ return new AtomicLSMIndexIOOperationCallbackFactory(idGeneratorFactory, datasetInfoProvider);
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 999636d..9ab4848 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -138,7 +138,7 @@
}
}
- private void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ protected void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
// will always update the checkpoint file even if no new component was created
FileReference target = operation.getTarget();
Map<String, Object> map = operation.getParameters();
@@ -150,7 +150,7 @@
indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId());
}
- private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ protected void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
// component was deleted... if a flush, do nothing.. if a merge, must update the checkpoint file
if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
// Get component id of the last disk component
@@ -298,12 +298,12 @@
return indexCheckpointManagerProvider.get(resourceReference).getValidComponentSequence();
}
- private boolean isMerge(ILSMIOOperation operation) {
+ protected boolean isMerge(ILSMIOOperation operation) {
return operation.getIOOpertionType() == LSMIOOperationType.MERGE
&& operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
}
- private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
+ protected static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
FileReference target = operation.getTarget();
String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
FileReference idxRelPath = target.getParent();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8880461..d7637f4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -423,16 +423,17 @@
if (createMetadataDataset) {
final double bloomFilterFalsePositiveRate =
appContext.getStorageProperties().getBloomFilterFalsePositiveRate();
- LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
- storageComponentProvider.getStorageManager(), typeTraits, cmpFactories, null, null, null,
- opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory,
- storageComponentProvider.getMetadataPageManagerFactory(),
- new AsterixVirtualBufferCacheProvider(datasetId),
- storageComponentProvider.getIoOperationSchedulerProvider(),
- appContext.getMetadataMergePolicyFactory(), StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES,
- true, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, null,
- NoOpCompressorDecompressorFactory.INSTANCE, true,
- TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false);
+ LSMBTreeLocalResourceFactory lsmBtreeFactory =
+ new LSMBTreeLocalResourceFactory(storageComponentProvider.getStorageManager(), typeTraits,
+ cmpFactories, null, null, null, opTrackerFactory, ioOpCallbackFactory,
+ pageWriteCallbackFactory, storageComponentProvider.getMetadataPageManagerFactory(),
+ new AsterixVirtualBufferCacheProvider(datasetId),
+ storageComponentProvider.getIoOperationSchedulerProvider(),
+ appContext.getMetadataMergePolicyFactory(),
+ StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, true, null, NoOpCompressorDecompressorFactory.INSTANCE, true,
+ TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE,
+ false, false);
DatasetLocalResourceFactory dsLocalResourceFactory =
new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same code path as
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
index 9a2821e..a61db6f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
@@ -100,7 +100,8 @@
pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, true, null, bloomFilterFalsePositiveRate,
index.isPrimaryIndex(), btreeFields, compDecompFactory, false,
- typeTraitProvider.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false);
+ typeTraitProvider.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE, false,
+ dataset.isAtomic());
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
dataset.getDatasetType().toString());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 88e96e7..ab4b585 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -113,7 +113,7 @@
mergePolicyFactory, mergePolicyProperties, true, bloomFilterFields,
bloomFilterFalsePositiveRate, index.isPrimaryIndex(), btreeFields, compDecompFactory,
hasBloomFilter, typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
- NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance);
+ NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, dataset.isAtomic());
} else {
//Column
List<Integer> keySourceIndicator =
@@ -127,7 +127,8 @@
pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, bloomFilterFields, bloomFilterFalsePositiveRate,
btreeFields, compDecompFactory, typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
- NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ NullIntrospector.INSTANCE, isSecondaryNoIncrementalMaintenance, columnManagerFactory,
+ dataset.isAtomic());
}
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index b97880e..e8baeb1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
@@ -81,6 +82,7 @@
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import org.apache.asterix.transaction.management.runtime.NoOpCommitRuntimeFactory;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -473,7 +475,12 @@
*/
@SuppressWarnings("squid:S1172")
public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index index) throws AlgebricksException {
- return new LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), getDatasetInfoProvider());
+ if (isAtomic()) {
+ return new AtomicLSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
+ getDatasetInfoProvider());
+ } else {
+ return new LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(), getDatasetInfoProvider());
+ }
}
public ILSMPageWriteCallbackFactory getPageWriteCallbackFactory() throws AlgebricksException {
@@ -499,6 +506,11 @@
return new DatasetInfoProvider(getDatasetId());
}
+ public boolean isAtomic() {
+ return datasetType == DatasetType.INTERNAL
+ && ((InternalDatasetDetails) getDatasetDetails()).isDatasetWithoutTypeSpecification();
+ }
+
/**
* Get search callback factory for this dataset with the passed index and operation
*
@@ -514,6 +526,10 @@
public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
Index index, IndexOperation op, int[] primaryKeyFields, int[] primaryKeyFieldsInSecondaryIndex,
boolean proceedIndexOnlyPlan) throws AlgebricksException {
+ if (isAtomic()) {
+ return NoOpOperationCallbackFactory.INSTANCE;
+ }
+
if (index.isPrimaryIndex()) {
/**
* Due to the read-committed isolation level,
@@ -566,6 +582,10 @@
public IModificationOperationCallbackFactory getModificationCallbackFactory(
IStorageComponentProvider componentProvider, Index index, IndexOperation op, int[] primaryKeyFields)
throws AlgebricksException {
+ if (isAtomic()) {
+ return NoOpOperationCallbackFactory.INSTANCE;
+ }
+
if (index.isPrimaryIndex()) {
return op == IndexOperation.UPSERT || op == IndexOperation.INSERT
? new UpsertOperationCallbackFactory(getDatasetId(),
@@ -636,6 +656,10 @@
IBinaryHashFunctionFactory[] pkHashFunFactories = getPrimaryHashFunctionFactories(metadataProvider);
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
pkHashFunFactories, datasetPartitions.length);
+ if (isAtomic()) {
+ return new NoOpCommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
+ metadataProvider.isWriteTransaction(), datasetPartitions, isSink, partitionerFactory);
+ }
return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, metadataProvider.isWriteTransaction(),
datasetPartitions, isSink, partitionerFactory);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index fd19937..5abad5d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -145,6 +145,10 @@
return filterSourceIndicator;
}
+ public boolean isDatasetWithoutTypeSpecification() {
+ return isDatasetWithoutTypeSpecification;
+ }
+
@Override
public DatasetType getDatasetType() {
return DatasetType.INTERNAL;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index ac798aa..3950053 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -219,9 +219,10 @@
}
}
+ boolean isDatasetWithoutTypeSpec = primaryKeyTypes != null && !primaryKeyTypes.isEmpty();
datasetDetails = new InternalDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
partitioningKey, keyFieldSourceIndicator, primaryKeyTypes, autogenerated, filterSourceIndicator,
- filterField);
+ filterField, isDatasetWithoutTypeSpec);
break;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 1de387f..8ee8f11 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -248,7 +248,7 @@
public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
- lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
+ lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName, datasetName);
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 3762e82..6e767ed 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -53,6 +54,7 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -61,6 +63,7 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -76,7 +79,7 @@
private MultiComparator keySearchCmp;
private RangePredicate searchPred;
private final IIndexCursor[] cursors;
- private final LockThenSearchOperationCallback[] searchCallbacks;
+ private final ISearchOperationCallback[] searchCallbacks;
private final ISearchOperationCallbackFactory searchCallbackFactory;
private final IFrameTupleProcessor[] processors;
private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
@@ -101,7 +104,7 @@
modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
this.sourceLoc = sourceLoc;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
- this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+ this.searchCallbacks = new ISearchOperationCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.lsmAccessorForUniqunessChecks = new LSMTreeIndexAccessor[partitions.length];
this.lsmAccessorForKeyIndexes = new LSMTreeIndexAccessor[partitions.length];
@@ -138,7 +141,7 @@
IIndexCursor cursor = cursors[i];
IIndexAccessor indexAccessor = indexAccessors[i];
LSMTreeIndexAccessor lsmAccessorForKeyIndex = lsmAccessorForKeyIndexes[i];
- LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+ ISearchOperationCallback searchCallback = searchCallbacks[i];
processors[i] = new IFrameTupleProcessor() {
@Override
public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
@@ -155,7 +158,9 @@
try {
if (cursor.hasNext()) {
// duplicate, skip
- searchCallback.release();
+ if (searchCallback instanceof LockThenSearchOperationCallback) {
+ ((LockThenSearchOperationCallback) searchCallback).release();
+ }
duplicate = true;
}
} finally {
@@ -226,7 +231,7 @@
}
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
- searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+ searchCallbacks[i] = searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
indexAccessors[i] = index.createAccessor(iap);
@@ -240,6 +245,8 @@
new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
lsmAccessorForUniqunessChecks[i] =
(LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+ setAtomicOpContextIfAtomic(indexForUniquessCheck, lsmAccessorForUniqunessChecks[i]);
+
cursors[i] = lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
@@ -311,6 +318,12 @@
failure = CleanupUtils.close(writer, failure);
failure = CleanupUtils.close(indexHelpers, failure);
failure = CleanupUtils.close(keyIndexHelpers, failure);
+ if (failure == null && !failed) {
+ commitAtomicInsert();
+ } else {
+ abortAtomicInsert();
+ }
+
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -318,6 +331,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
@@ -325,4 +339,24 @@
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
+ private void commitAtomicInsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+
+ private void abortAtomicInsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3a9a020..03130ae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -26,6 +26,7 @@
import java.util.Date;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -68,6 +69,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -76,6 +78,7 @@
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.projection.ITupleProjector;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -119,7 +122,7 @@
private final boolean hasMeta;
private final int filterFieldIndex;
private final int metaFieldIndex;
- protected final LockThenSearchOperationCallback[] searchCallbacks;
+ protected final ISearchOperationCallback[] searchCallbacks;
protected final IFrameOperationCallback[] frameOpCallbacks;
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private final ISearchOperationCallbackFactory searchCallbackFactory;
@@ -142,7 +145,7 @@
modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
this.hasSecondaries = hasSecondaries;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
- this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+ this.searchCallbacks = new ISearchOperationCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.processors = new IFrameTupleProcessor[partitions.length];
this.key = new PermutingFrameTupleReference();
@@ -180,7 +183,7 @@
for (int i = 0; i < partitions.length; i++) {
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[i];
IIndexCursor cursor = cursors[i];
- LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+ ISearchOperationCallback searchCallback = searchCallbacks[i];
IModificationOperationCallback modCallback = modCallbacks[i];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[i];
processors[i] = new IFrameTupleProcessor() {
@@ -189,8 +192,7 @@
throws HyracksDataException {
try {
tb.reset();
- AbstractIndexModificationOperationCallback abstractModCallback =
- (AbstractIndexModificationOperationCallback) modCallback;
+ IModificationOperationCallback abstractModCallback = modCallback;
boolean recordWasInserted = false;
boolean recordWasDeleted = false;
boolean isDelete = isDeleteOperation(tuple, numOfPrimaryKeys);
@@ -223,11 +225,17 @@
if (isDelete && prevTuple != null) {
// Only delete if it is a delete and not upsert
// And previous tuple with the same key was found
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(tuple);
recordWasDeleted = true;
} else if (!isDelete) {
- abstractModCallback.setOp(Operation.UPSERT);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(Operation.UPSERT);
+ }
lsmAccessor.forceUpsert(tuple);
recordWasInserted = true;
}
@@ -296,11 +304,12 @@
}
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
- searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+ searchCallbacks[i] = searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], searchCallbacks[i]);
iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
indexAccessors[i] = indexes[i].createAccessor(iap);
+ setAtomicOpContextIfAtomic(indexes[i], indexAccessors[i]);
cursors[i] = ((LSMTreeIndexAccessor) indexAccessors[i]).createSearchCursor(false);
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
appCtx.getTransactionSubsystem().getLogManager());
@@ -353,7 +362,7 @@
}
protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted,
- LockThenSearchOperationCallback searchCallback) throws IOException {
+ ISearchOperationCallback searchCallback) throws IOException {
if (recordWasInserted || recordWasDeleted) {
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -363,7 +372,9 @@
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
} else {
try {
- searchCallback.release();
+ if (searchCallback instanceof LockThenSearchOperationCallback) {
+ ((LockThenSearchOperationCallback) searchCallback).release();
+ }
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
@@ -506,6 +517,12 @@
failure = CleanupUtils.destroy(failure, cursors);
failure = CleanupUtils.close(writer, failure);
failure = CleanupUtils.close(indexHelpers, failure);
+ if (failure == null && !failed) {
+ commitAtomicUpsert();
+ } else {
+ abortAtomicUpsert();
+ }
+
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -531,6 +548,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
@@ -538,4 +556,24 @@
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
+ private void commitAtomicUpsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+
+ private void abortAtomicUpsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 6d5e88b0..3dc6d37 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
/**
* This operator node is used for secondary indexes with upsert operations.
@@ -101,8 +102,7 @@
int storagePartition = tuplePartitioner.partition(accessor, i);
int storageIdx = storagePartitionId2Index.get(storagePartition);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
- AbstractIndexModificationOperationCallback abstractModCallback =
- (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
+ IModificationOperationCallback abstractModCallback = modCallbacks[storageIdx];
frameTuple.reset(accessor, i);
int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
@@ -111,23 +111,33 @@
if (operation == UPSERT_NEW) {
if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.INSERT);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback).setOp(Operation.INSERT);
+ }
lsmAccessor.forceInsert(tuple);
}
} else if (operation == UPSERT_EXISTING) {
if (!TupleUtils.equalTuples(tuple, prevTuple, numberOfFields)) {
if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(prevTuple);
}
if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.INSERT);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(Operation.INSERT);
+ }
lsmAccessor.forceInsert(tuple);
}
}
} else if (operation == DELETE_EXISTING) {
if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback).setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(prevTuple);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 17858a3e..3eb9437 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
public class LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSecondaryUpsertOperatorNodePushable {
private final NestedTupleSourceRuntime[] startOfNewKeyPipelines;
@@ -190,11 +191,13 @@
int storagePartition = tuplePartitioner.partition(tuple.getFrameTupleAccessor(), tuple.getTupleIndex());
int storageIdx = storagePartitionId2Index.get(storagePartition);
ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
- AbstractIndexModificationOperationCallback abstractModCallback =
- (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
+ IModificationOperationCallback abstractModCallback = modCallbacks[storageIdx];
// Finally, pass the tuple to our accessor. There are only two operations: insert or delete.
if (this.isInsert) {
- abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+ }
try {
workingLSMAccessor.forceInsert(endTupleReference);
} catch (HyracksDataException e) {
@@ -203,7 +206,10 @@
}
}
} else {
- abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+ if (abstractModCallback instanceof AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback) abstractModCallback)
+ .setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+ }
workingLSMAccessor.forceDelete(endTupleReference);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
new file mode 100644
index 0000000..2312f15
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class NoOpCommitRuntime extends CommitRuntime {
+
+ public NoOpCommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields,
+ boolean isWriteTransaction, int resourcePartition, boolean isSink,
+ ITuplePartitionerFactory partitionerFactory, int[] datasetPartitions) {
+ super(ctx, txnId, datasetId, primaryKeyFields, isWriteTransaction, resourcePartition, isSink,
+ partitionerFactory, datasetPartitions);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ transactionContext = transactionManager.getTransactionContext(txnId);
+ transactionContext.setWriteTxn(isWriteTransaction);
+ if (isSink) {
+ return;
+ }
+ initAccessAppend(ctx);
+ super.open();
+ } catch (ACIDException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ if (!isSink) {
+ appendTupleToFrame(t);
+ }
+ } catch (ACIDException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+ if (message != null
+ && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+ message.reset();
+ message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ }
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
new file mode 100644
index 0000000..bfe610b
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import org.apache.asterix.common.api.IJobEventListenerFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+public class NoOpCommitRuntimeFactory extends CommitRuntimeFactory {
+
+ private static final long serialVersionUID = 2L;
+
+ public NoOpCommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
+ int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory partitionerFactory) {
+ super(datasetId, primaryKeyFields, isWriteTransaction, datasetPartitions, isSink, partitionerFactory);
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
+ return new IPushRuntime[] { new NoOpCommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId),
+ datasetId, primaryKeyFields, isWriteTransaction,
+ datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, partitionerFactory,
+ datasetPartitions) };
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 3c829e6..b076cf2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -30,6 +30,8 @@
public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";
+ public static final String ATOMIC_OP_CONTEXT = "ATOMIC_OP_CONTEXT";
+
private HyracksConstants() {
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index c3bc5ca..e279eaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -49,6 +49,6 @@
getVirtualBufferCacheProvider(), SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY,
MERGE_POLICY_PROPERTIES, DURABLE, bloomFilterKeyFields,
LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, btreefields,
- NoOpCompressorDecompressorFactory.INSTANCE, bloomFilterKeyFields != null, null, null, false);
+ NoOpCompressorDecompressorFactory.INSTANCE, bloomFilterKeyFields != null, null, null, false, false);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 422aef3..10425f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -69,6 +69,7 @@
protected final int[] partitions;
protected final Int2IntMap storagePartitionId2Index;
protected boolean writerOpen;
+ protected boolean failed;
public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -92,6 +93,7 @@
this.op = op;
this.tuple.setFieldPermutation(fieldPermutation);
this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
+ this.failed = false;
}
@Override
@@ -203,6 +205,7 @@
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (writerOpen) {
writer.fail();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index 8c47bba..a0b592a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -61,22 +61,22 @@
ILSMIOOperationSchedulerProvider ioSchedulerProvider,
ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
- IColumnManagerFactory columnManagerFactory) {
+ IColumnManagerFactory columnManagerFactory, boolean atomic) {
super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, path, storageManager,
mergePolicyFactory, mergePolicyProperties, null, null, btreeFields, null, opTrackerProvider,
ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider,
ioSchedulerProvider, true, compressorDecompressorFactory, true, nullTypeTraits, nullIntrospector,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
- boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory)
+ boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory, boolean atomic)
throws HyracksDataException {
super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
- compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+ compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
@@ -93,7 +93,7 @@
opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
- columnManagerFactory);
+ columnManagerFactory, atomic);
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
@@ -109,11 +109,12 @@
JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
boolean isSecondaryNoIncrementalMaintenance =
getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+ boolean atomic = getOrDefaultBoolean(json, "atomic", false);
IColumnManagerFactory columnManagerFactory =
(IColumnManagerFactory) registry.deserialize(columnManagerFactoryNode);
return new LSMColumnBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
isPrimary, btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance,
- columnManagerFactory);
+ columnManagerFactory, atomic);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
index eccb7c2..2cd045a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -51,12 +51,12 @@
Map<String, String> mergePolicyProperties, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
int[] btreeFields, ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
- IColumnManagerFactory columnManagerFactory) {
+ IColumnManagerFactory columnManagerFactory, boolean atomic) {
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, true, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, true, btreeFields, compressorDecompressorFactory, true, nullTypeTraits,
- nullIntrospector, isSecondaryNoIncrementalMaintenance);
+ nullIntrospector, isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
@@ -66,6 +66,6 @@
bloomFilterFalsePositiveRate, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
mergePolicyProperties, btreeFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, compressorDecompressorFactory,
- nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory, atomic);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index 048d9de..e95b380 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -70,11 +70,13 @@
double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
- int[] btreeFields, ITracer tracer, IColumnManager columnManager) throws HyracksDataException {
+ int[] btreeFields, ITracer tracer, IColumnManager columnManager, boolean atomic)
+ throws HyracksDataException {
super(ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory,
diskBufferCache, fileManager, componentFactory, bulkloadComponentFactory, null, null, null,
bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy, opTracker, ioScheduler,
- ioOpCallbackFactory, pageWriteCallbackFactory, true, true, btreeFields, null, true, false, tracer);
+ ioOpCallbackFactory, pageWriteCallbackFactory, true, true, btreeFields, null, true, false, tracer,
+ atomic);
this.columnManager = columnManager;
this.mergeComponentFactory = mergeComponentFactory;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 1a55447..6ef0dc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -63,7 +63,7 @@
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
int[] btreeFields, IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
- INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory)
+ INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic)
throws HyracksDataException {
//Tuple writers
@@ -111,6 +111,6 @@
deleteLeafFrameFactory, diskBufferCache, fileNameManager, flushComponentFactory, mergeComponentFactory,
bulkLoadComponentFactory, bloomFilterFalsePositiveRate, typeTraits.length, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, tracer,
- columnManagerFactory.createColumnManager());
+ columnManagerFactory.createColumnManager(), atomic);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 0a47fc0..43555ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -60,6 +60,7 @@
protected final int[] btreeFields;
protected final ICompressorDecompressorFactory compressorDecompressorFactory;
protected final boolean isSecondaryNoIncrementalMaintenance;
+ protected final boolean atomic;
public LSMBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path,
@@ -71,8 +72,8 @@
IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable,
ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
- ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
- boolean isSecondaryNoIncrementalMaintenance) {
+ ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ boolean atomic) {
super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, nullTypeTraits,
@@ -84,12 +85,13 @@
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.hasBloomFilter = hasBloomFilter;
this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
- boolean isSecondaryNoIncrementalMaintenance) throws HyracksDataException {
+ boolean isSecondaryNoIncrementalMaintenance, boolean atomic) throws HyracksDataException {
super(registry, json);
this.bloomFilterKeyFields = bloomFilterKeyFields;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
@@ -98,6 +100,7 @@
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.hasBloomFilter = hasBloomFilter;
this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
@Override
@@ -115,7 +118,7 @@
opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
- compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector);
+ compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector, atomic);
}
public boolean isSecondaryNoIncrementalMaintenance() {
@@ -141,8 +144,9 @@
.deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
boolean isSecondaryNoIncrementalMaintenance =
getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+ boolean atomic = getOrDefaultBoolean(json, "atomic", false);
return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary,
- btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+ btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance, atomic);
}
@Override
@@ -156,6 +160,7 @@
json.putPOJO("btreeFields", btreeFields);
json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
json.put("isSecondaryNoIncrementalMaintenance", isSecondaryNoIncrementalMaintenance);
+ json.put("atomic", atomic);
}
protected static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean isPrimary) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 6695e90..bb44655 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -49,6 +49,7 @@
protected final int[] btreeFields;
protected final ICompressorDecompressorFactory compressorDecompressorFactory;
protected final boolean isSecondaryNoIncrementalMaintenance;
+ protected final boolean atomic;
public LSMBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
@@ -60,8 +61,8 @@
Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
- ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
- boolean isSecondaryNoIncrementalMaintenance) {
+ ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ boolean atomic) {
super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, nullTypeTraits,
@@ -73,6 +74,7 @@
this.btreeFields = btreeFields;
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.isSecondaryNoIncrementalMaintenance = isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
@Override
@@ -82,7 +84,7 @@
filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable,
compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, atomic);
}
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index eceefc6..05e78dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -100,11 +100,11 @@
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
boolean needKeyDupCheck, boolean hasBloomFilter, int[] btreeFields, int[] filterFields, boolean durable,
- boolean updateAware, ITracer tracer) throws HyracksDataException {
+ boolean updateAware, ITracer tracer, boolean atomic) throws HyracksDataException {
super(ioManager, virtualBufferCaches, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy,
opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, componentFactory,
bulkLoadComponentFactory, filterFrameFactory, filterManager, filterFields, durable, filterHelper,
- btreeFields, tracer);
+ btreeFields, tracer, atomic);
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index c3ca3d5..3106b29 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -72,6 +72,23 @@
boolean updateAware, ITracer tracer, ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter, ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector)
throws HyracksDataException {
+ return createLSMTree(ioManager, virtualBufferCaches, file, diskBufferCache, typeTraits, cmpFactories,
+ bloomFilterKeyFields, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler,
+ ioOpCallbackFactory, pageWriteCallbackFactory, needKeyDupCheck, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, durable, freePageManagerFactory, updateAware, tracer,
+ compressorDecompressorFactory, hasBloomFilter, nullTypeTraits, nullIntrospector, false);
+ }
+
+ public static LSMBTree createLSMTree(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+ FileReference file, IBufferCache diskBufferCache, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits, IBinaryComparatorFactory[] filterCmpFactories,
+ int[] btreeFields, int[] filterFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory,
+ boolean updateAware, ITracer tracer, ICompressorDecompressorFactory compressorDecompressorFactory,
+ boolean hasBloomFilter, ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, boolean atomic)
+ throws HyracksDataException {
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -124,6 +141,6 @@
deleteLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bulkLoadComponentFactory,
filterHelper, filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, typeTraits.length,
cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory,
- needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer);
+ needKeyDupCheck, hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer, atomic);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 69b9547..4756921 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,6 +43,27 @@
*/
public interface ILSMIndex extends IIndex {
+ boolean isAtomic();
+
+ /**
+ * Commits the atomic statement by adding all the temporary disk components generated by an statement to the
+ * list of disk components. Queries after call to this function will be able to read all the changes made by
+ * the insert/upsert/delete statement.
+ *
+ * @throws HyracksDataException
+ */
+ void commit() throws HyracksDataException;
+
+ /**
+ * Aborts the ongoing statement by destroying all temporary disk components generated by the statement and
+ * resetting the memory components
+ *
+ * @throws HyracksDataException
+ */
+ void abort() throws HyracksDataException;
+
+ ILSMMergePolicy getMergePolicy();
+
void deactivate(boolean flush) throws HyracksDataException;
@Override
@@ -106,6 +127,8 @@
void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException;
+ void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException;
+
void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index c64e1e1..2ec59d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -19,11 +19,14 @@
package org.apache.hyracks.storage.am.lsm.common.dataflow;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -31,8 +34,11 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertUpdateDeleteOperatorNodePushable
implements ILSMIndexFrameWriter {
@@ -134,4 +140,12 @@
}
appender.write(writer, true);
}
+
+ protected void setAtomicOpContextIfAtomic(IIndex index, IIndexAccessor accessor) {
+ if (((ILSMIndex) index).isAtomic()) {
+ Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+ indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+ ((ILSMIndexAccessor) accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2745b44..b10dd5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -39,6 +39,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -112,6 +113,9 @@
protected final ILSMDiskComponentFactory bulkLoadComponentFactory;
protected final ILSMPageWriteCallbackFactory pageWriteCallbackFactory;
private int numScheduledFlushes = 0;
+ private final boolean atomic;
+ private final List<ILSMDiskComponent> temporaryDiskComponents;
+ private final ILSMMergePolicy mergePolicy;
public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
@@ -119,8 +123,8 @@
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
- int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer)
- throws HyracksDataException {
+ int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer,
+ boolean atomic) throws HyracksDataException {
this.ioManager = ioManager;
this.virtualBufferCaches = virtualBufferCaches;
this.diskBufferCache = diskBufferCache;
@@ -139,6 +143,10 @@
this.inactiveMemoryComponents = new ArrayList<>();
this.durable = durable;
this.tracer = tracer;
+ this.atomic = atomic;
+ this.temporaryDiskComponents = new ArrayList<>();
+ this.mergePolicy = mergePolicy;
+
fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
tracer);
@@ -152,6 +160,25 @@
}
}
+ public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
+ IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
+ ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
+ int[] filterFields, boolean durable, IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer)
+ throws HyracksDataException {
+ this(ioManager, virtualBufferCaches, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy,
+ opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, componentFactory,
+ bulkLoadComponentFactory, filterFrameFactory, filterManager, filterFields, durable, filterHelper,
+ treeFields, tracer, false);
+ }
+
+ @Override
+ public boolean isAtomic() {
+ return atomic;
+ }
+
@Override
public synchronized void create() throws HyracksDataException {
if (isActive) {
@@ -223,6 +250,9 @@
for (ILSMDiskComponent c : diskComponents) {
c.deactivateAndPurge();
}
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.deactivateAndPurge();
+ }
}
private void deallocateMemoryComponents() throws HyracksDataException {
@@ -247,6 +277,9 @@
for (ILSMDiskComponent c : diskComponents) {
c.destroy();
}
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.destroy();
+ }
}
@Override
@@ -265,6 +298,15 @@
diskComponents.clear();
}
+ @Override
+ public void abort() throws HyracksDataException {
+ resetMemoryComponents();
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.deactivateAndDestroy();
+ }
+ temporaryDiskComponents.clear();
+ }
+
private void resetMemoryComponents() throws HyracksDataException {
if (memoryComponentsAllocated && memoryComponents != null) {
for (ILSMMemoryComponent c : memoryComponents) {
@@ -299,7 +341,9 @@
operationalComponents.addAll(diskComponents);
break;
case SEARCH:
- if (memoryComponentsAllocated) {
+ // search should include memory components for datasets with atomic statements not enabled or search to
+ // check duplicate key while inserts/upserts on datasets with atomic statements enabled
+ if (memoryComponentsAllocated && (!atomic || isAtomicOpContext(ctx))) {
addOperationalMemoryComponents(operationalComponents, false);
}
if (filterManager != null) {
@@ -315,6 +359,9 @@
} else {
operationalComponents.addAll(diskComponents);
}
+ if (isAtomicOpContext(ctx)) {
+ operationalComponents.addAll(temporaryDiskComponents);
+ }
break;
case REPLICATE:
@@ -328,6 +375,11 @@
}
}
+ private boolean isAtomicOpContext(ILSMIndexOperationContext ctx) {
+ Map<String, Object> ctxParameters = ctx.getParameters();
+ return ctxParameters != null && (boolean) ctxParameters.getOrDefault(HyracksConstants.ATOMIC_OP_CONTEXT, false);
+ }
+
@Override
public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
@@ -554,12 +606,35 @@
@Override
public void addDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
if (c != EmptyComponent.INSTANCE) {
+ if (atomic) {
+ temporaryDiskComponents.add(c);
+ LOGGER.debug("Adding new temporary disk component to index {}; current count: {}", c,
+ temporaryDiskComponents.size());
+ } else {
+ diskComponents.add(0, c);
+ }
+ }
+ validateComponentIds();
+ }
+
+ @Override
+ public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+ if (c != EmptyComponent.INSTANCE) {
diskComponents.add(0, c);
}
validateComponentIds();
}
@Override
+ public void commit() throws HyracksDataException {
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ diskComponents.add(0, c);
+ }
+ temporaryDiskComponents.clear();
+ validateComponentIds();
+ }
+
+ @Override
public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
throws HyracksDataException {
int swapIndex = diskComponents.indexOf(mergedComponents.get(0));
@@ -879,4 +954,9 @@
return pageWriteCallbackFactory;
}
+ @Override
+ public ILSMMergePolicy getMergePolicy() {
+ return mergePolicy;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e2c9fab..9fcce8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -190,7 +190,9 @@
if (opType == LSMOperationType.FLUSH) {
opTracker.notifyAll();
if (!failedOperation) {
- waitForLaggingMerge();
+ if (!lsmIndex.isAtomic()) {
+ waitForLaggingMerge();
+ }
}
} else if (opType == LSMOperationType.MERGE) {
opTracker.notifyAll();
@@ -299,7 +301,9 @@
componentsToBeReplicated.add(newComponent);
triggerReplication(componentsToBeReplicated, opType);
}
- mergePolicy.diskComponentAdded(lsmIndex, false);
+ if (!lsmIndex.isAtomic()) {
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
}
break;
case MERGE:
@@ -639,7 +643,7 @@
throw HyracksDataException.create(ioOperation.getFailure());
}
synchronized (opTracker) {
- lsmIndex.addDiskComponent(c);
+ lsmIndex.addBulkLoadedDiskComponent(c);
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 2d0f079..94d3447 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -95,7 +95,7 @@
diskBufferCache, fileManager, componentFactory, bulkLoadComponentFactory, filterHelper,
filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOperationCallbackFactory, pageWriteCallbackFactory, needKeyDupCheck,
- hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer);
+ hasBloomFilter, btreeFields, filterFields, durable, updateAware, tracer, false);
addModifyCallback(AllowTestOpCallback.INSTANCE);
addSearchCallback(AllowTestOpCallback.INSTANCE);
addFlushCallback(AllowTestOpCallback.INSTANCE);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 039b244..04fa11f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -63,14 +63,14 @@
btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable,
NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, null, null,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, false);
}
protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields, boolean hasBloomFilter,
boolean isSecondaryNoIncrementalMaintenance) throws HyracksDataException {
super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
- NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+ NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, isSecondaryNoIncrementalMaintenance, false);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 48adf91..54b786b 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -53,7 +53,7 @@
vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable,
bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter, null, null,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, false);
}
@Override