[NO ISSUE][MTD][TX] Implement atomic metadata transactions without WAL
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- With this change, metadata transactions without WAL are made atomic on
cloud deployments.
- Some refactoring related to global transaction manager.
Change-Id: I282e0e4ca8a9bff68fa88613b9d34b14bc2b764c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17657
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index d69376a..f6456dc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -104,8 +104,7 @@
}
@Override
- public void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
- Map<String, ILSMComponentId> componentIdMap) {
+ public void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap) {
IGlobalTransactionContext context = txnContextRepository.get(jobId);
if (context == null) {
LOGGER.warn("JobPreparedMessage received for jobId " + jobId
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123..5494250 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,6 +61,7 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -157,6 +158,9 @@
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
+ if (appCtx.isCloudDeployment()) {
+ doMetadataRecovery();
+ }
LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -171,6 +175,11 @@
replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
}
+ public synchronized void doMetadataRecovery() {
+ LOGGER.info("starting recovery for metadata partition {}", StorageConstants.METADATA_PARTITION);
+ appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+ }
+
public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
boolean closeOnFlushRedo) throws IOException, ACIDException {
try {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index fb08fe3..cea3832 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -175,7 +175,7 @@
LogRecord logRecord = new LogRecord();
final long txnId = 1;
logRecord.setTxnCtx(TransactionContextFactory.create(new TxnId(txnId),
- new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)));
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL), ncAppCtx));
logRecord.setLogSource(LogSource.LOCAL);
logRecord.setLogType(LogType.WAIT);
logRecord.setTxnId(txnId);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
index 498d174..956ae9e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalTxManager.java
@@ -46,8 +46,7 @@
IGlobalTransactionContext getTransactionContext(JobId jobId) throws ACIDException;
- void handleJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
- Map<String, ILSMComponentId> componentIdMap);
+ void handleJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap);
void handleJobCompletionMessage(JobId jobId, String nodeId);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 64748ce..1a23ca8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -249,7 +249,6 @@
private void commitAtomicInsertDelete() throws HyracksDataException {
if (isPrimary) {
final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -259,14 +258,13 @@
for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
- ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
try {
((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
index 8adbf49..b4832ff 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AtomicJobPreparedMessage.java
@@ -38,20 +38,17 @@
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final String nodeId;
- private final int datasetId;
private final Map<String, ILSMComponentId> componentIdMap;
- public AtomicJobPreparedMessage(JobId jobId, String nodeId, int datasetId,
- Map<String, ILSMComponentId> componentIdMap) {
+ public AtomicJobPreparedMessage(JobId jobId, String nodeId, Map<String, ILSMComponentId> componentIdMap) {
this.nodeId = nodeId;
- this.datasetId = datasetId;
this.componentIdMap = componentIdMap;
this.jobId = jobId;
}
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, datasetId, componentIdMap);
+ appCtx.getGlobalTxManager().handleJobPreparedMessage(jobId, nodeId, componentIdMap);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 30c693d..66b5359 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -112,4 +112,9 @@
*/
void ensureMaxTxnId(long txnId);
+ /**
+ * Rollback incomplete metadata transactions without WAL during recovery.
+ */
+ void rollbackMetadataTransactionsWithoutWAL();
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 1321c96..ed9c48e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -30,6 +30,7 @@
*/
public class StorageConstants {
+ public static final String METADATA_TXN_NOWAL_DIR_NAME = "mtd-txn-logs";
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 522fd32..b40a4eb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -128,6 +128,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1742,6 +1743,12 @@
return sb.toString();
}
+ private void setAtomicOpContext(IIndexAccessor accessor) {
+ Map<String, Object> indexAccessorOpContextParameters = new HashMap<>();
+ indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+ ((ILSMIndexAccessor) accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+ }
+
private <T> void searchIndex(TxnId txnId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<T> valueExtractor, List<T> results) throws AlgebricksException, HyracksDataException {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
@@ -1753,6 +1760,9 @@
IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ if (atomicNoWAL) {
+ setAtomicOpContext(indexAccessor);
+ }
try {
IBinaryComparator[] searchCmps = null;
MultiComparator searchCmp = null;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c1eba7c..be935d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -433,7 +433,7 @@
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, true, null, NoOpCompressorDecompressorFactory.INSTANCE, true,
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL), NullIntrospector.INSTANCE,
- false, false);
+ false, appContext.isCloudDeployment());
DatasetLocalResourceFactory dsLocalResourceFactory =
new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same code path as
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index ec016bc..2c89b61 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -352,7 +352,6 @@
private void commitAtomicInsert() throws HyracksDataException {
final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -362,14 +361,13 @@
for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
- ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
try {
((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index de95d60..6b9d3eb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -570,7 +570,6 @@
// TODO: Refactor and remove duplicated code
private void commitAtomicUpsert() throws HyracksDataException {
final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
- int datasetID = -1;
boolean atomic = false;
for (IIndex index : indexes) {
if (((ILSMIndex) index).isAtomic()) {
@@ -580,14 +579,13 @@
for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) {
componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
}
- datasetID = opTracker.getDatasetInfo().getDatasetID();
atomic = true;
}
}
if (atomic) {
AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
- ctx.getJobletContext().getServiceContext().getNodeId(), datasetID, componentIdMap);
+ ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap);
try {
((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService())
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index 3576ba1..aa698be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,25 +18,46 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.util.annotations.ThreadSafe;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
@ThreadSafe
public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
- public AtomicNoWALTransactionContext(TxnId txnId) {
+ private final INcApplicationContext appCtx;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public AtomicNoWALTransactionContext(TxnId txnId, INcApplicationContext appCtx) {
super(txnId);
+ this.appCtx = appCtx;
}
@Override
@@ -59,7 +80,7 @@
for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
try {
- primaryIndexOpTracker.deleteMemoryComponent(true);
+ primaryIndexOpTracker.abort();
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
@@ -68,19 +89,102 @@
private void ensureDurable() {
List<FlushOperation> flushes = new ArrayList<>();
+ List<Integer> datasetIds = new ArrayList<>();
+ Map<String, ILSMComponentId> resourceMap = new HashMap<>();
LogRecord dummyLogRecord = new LogRecord();
try {
for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
primaryIndexOpTracker.triggerScheduleFlush(dummyLogRecord);
flushes.addAll(primaryIndexOpTracker.getScheduledFlushes());
+ datasetIds.add(primaryIndexOpTracker.getDatasetInfo().getDatasetID());
+ for (Map.Entry<String, FlushOperation> entry : primaryIndexOpTracker.getLastFlushOperation()
+ .entrySet()) {
+ resourceMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId());
+ }
}
LSMIndexUtil.waitFor(flushes);
+ persistLogFile(datasetIds, resourceMap);
+ } catch (Exception e) {
+ deleteUncommittedRecords();
+ throw new ACIDException(e);
+ }
+ try {
+ commit();
+ } catch (Exception e) {
+ rollback(resourceMap);
+ throw new ACIDException(e);
+ } finally {
+ deleteLogFile();
+ }
+ enableMerge();
+ }
+
+ private void persistLogFile(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap)
+ throws HyracksDataException, JsonProcessingException {
+ IIOManager ioManager = appCtx.getIoManager();
+ FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
+ String.format("%s.log", txnId)).toString());
+ MetadataAtomicTransactionLog txnLog = new MetadataAtomicTransactionLog(txnId, datasetIds,
+ appCtx.getServiceContext().getNodeId(), resourceMap);
+ ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+ }
+
+ public void deleteLogFile() {
+ IIOManager ioManager = appCtx.getIoManager();
+ try {
+ FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
+ String.format("%s.log", txnId)).toString());
+ ioManager.delete(fref);
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
}
+ private void commit() throws HyracksDataException {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+ primaryIndexOpTracker.commit();
+ }
+ }
+
+ private void enableMerge() {
+ for (ILSMOperationTracker opTrackerRef : modifiedIndexes) {
+ PrimaryIndexOperationTracker primaryIndexOpTracker = (PrimaryIndexOperationTracker) opTrackerRef;
+ for (IndexInfo indexInfo : primaryIndexOpTracker.getDatasetInfo().getIndexes().values()) {
+ if (indexInfo.getIndex().isPrimaryIndex()) {
+ try {
+ indexInfo.getIndex().getMergePolicy().diskComponentAdded(indexInfo.getIndex(), false);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ }
+ }
+ }
+ }
+
+ public void rollback(Map<String, ILSMComponentId> resourceMap) {
+ deleteUncommittedRecords();
+ IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ datasetLifecycleManager.getIndexCheckpointManagerProvider();
+ resourceMap.forEach((k, v) -> {
+ try {
+ IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ if (checkpointManager.getCheckpointCount() > 0) {
+ IndexCheckpoint checkpoint = checkpointManager.getLatest();
+ if (checkpoint.getLastComponentId() == v.getMaxId()) {
+ checkpointManager.deleteLatest(v.getMaxId(), 1);
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
+ });
+ }
+
@Override
public boolean hasWAL() {
return false;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
new file mode 100644
index 0000000..7b3af3f
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MetadataAtomicTransactionLog {
+
+ private TxnId txnId;
+ private List<Integer> datasetIds;
+ private String nodeId;
+ private Map<String, ILSMComponentId> resourceMap;
+
+ @JsonCreator
+ public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
+ @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeId") String nodeId,
+ @JsonProperty("resourceMap") Map<String, ILSMComponentId> resourceMap) {
+ this.txnId = txnId;
+ this.datasetIds = datasetIds;
+ this.nodeId = nodeId;
+ this.resourceMap = resourceMap;
+ }
+
+ public TxnId getTxnId() {
+ return txnId;
+ }
+
+ public List<Integer> getDatasetIds() {
+ return datasetIds;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public Map<String, ILSMComponentId> getResourceMap() {
+ return resourceMap;
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
index 1076086..e1e2ca8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -20,6 +20,7 @@
import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
@@ -29,13 +30,13 @@
private TransactionContextFactory() {
}
- public static ITransactionContext create(TxnId txnId, TransactionOptions options) {
+ public static ITransactionContext create(TxnId txnId, TransactionOptions options, INcApplicationContext appCtx) {
final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
switch (atomicityLevel) {
case ATOMIC:
return new AtomicTransactionContext(txnId);
case ATOMIC_NO_WAL:
- return new AtomicNoWALTransactionContext(txnId);
+ return new AtomicNoWALTransactionContext(txnId, appCtx);
case ENTITY_LEVEL:
return new EntityLevelTransactionContext(txnId);
default:
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 8bbfbfd..ea7fcb8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,14 +35,19 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
@ThreadSafe
public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
@@ -61,7 +67,7 @@
if (txnCtx != null) {
throw new ACIDException("Transaction with the same (" + txnId + ") already exists");
}
- txnCtx = TransactionContextFactory.create(txnId, options);
+ txnCtx = TransactionContextFactory.create(txnId, options, txnSubsystem.getApplicationContext());
txnCtxRepository.put(txnId, txnCtx);
ensureMaxTxnId(txnId.getId());
return txnCtx;
@@ -187,4 +193,27 @@
LOGGER.log(Level.WARN, "exception while dumping state", e);
}
}
+
+ @Override
+ public void rollbackMetadataTransactionsWithoutWAL() {
+ IIOManager ioManager = txnSubsystem.getApplicationContext().getIoManager();
+ try {
+ Set<FileReference> txnLogFileRefs =
+ ioManager.list(ioManager.resolve(Paths
+ .get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
+ StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION)
+ .toString()));
+ ObjectMapper objectMapper = new ObjectMapper();
+ for (FileReference txnLogFileRef : txnLogFileRefs) {
+ MetadataAtomicTransactionLog atomicTransactionLog = objectMapper.readValue(
+ new String(ioManager.readAllBytes(txnLogFileRef)), MetadataAtomicTransactionLog.class);
+ AtomicNoWALTransactionContext context = new AtomicNoWALTransactionContext(
+ atomicTransactionLog.getTxnId(), txnSubsystem.getApplicationContext());
+ context.rollback(atomicTransactionLog.getResourceMap());
+ context.deleteLogFile();
+ }
+ } catch (Exception e) {
+ throw new ACIDException(e);
+ }
+ }
}