[NO ISSUE][TX][TEST] Add test for atomic statments
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add test that stops NCs at random times while upserts are
taking place on standalone collections.
- Fix the issue with rollback and recovery for atomic statements
- Add timeout while waiting for AtomicJobComplete and
AtomicJobRollback messages
- Remove datasets without type spec from checpoint thread
- Some refactoring
Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
index f6456dc..b4223bc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.cc;
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,8 +40,11 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -85,7 +90,8 @@
synchronized (context) {
try {
- context.wait();
+ CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig();
+ context.wait(config.getGlobalTxCommitTimeout());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ACIDException(e);
@@ -194,10 +200,12 @@
}
synchronized (context) {
try {
- context.wait();
+ CCConfig config = ((ClusterControllerService) serviceContext.getControllerService()).getCCConfig();
+ context.wait(config.getGlobalTxRollbackTimeout());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new ACIDException(e);
+ LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId);
+ ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
}
}
txnContextRepository.remove(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
index 71a641e..e611d8f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.message;
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
import java.util.List;
import java.util.Map;
@@ -32,6 +34,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Message sent from CC to all NCs to rollback an atomic statement/job.
@@ -43,6 +48,8 @@
private final List<Integer> datasetIds;
private final Map<String, ILSMComponentId> componentIdMap;
+ private static final Logger LOGGER = LogManager.getLogger();
+
public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
Map<String, ILSMComponentId> componentIdMap) {
this.jobId = jobId;
@@ -57,15 +64,19 @@
datasetLifecycleManager.getIndexCheckpointManagerProvider();
componentIdMap.forEach((k, v) -> {
try {
- IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ IIndexCheckpointManager checkpointManager =
+ indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
if (checkpointManager.getCheckpointCount() > 0) {
IndexCheckpoint checkpoint = checkpointManager.getLatest();
if (checkpoint.getLastComponentId() == v.getMaxId()) {
+ LOGGER.info("Removing checkpoint for resource {} for component id {}", k,
+ checkpoint.getLastComponentId());
checkpointManager.deleteLatest(v.getMaxId(), 1);
}
}
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", jobId);
+ ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
}
});
AtomicJobRollbackCompleteMessage message =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
index 0936512..beaaac0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -70,6 +70,9 @@
deleteInvalidIndex(appContext, localResourceRepository, resource);
}
}
+ for (Integer partition : nodePartitions) {
+ localResourceRepository.cleanup(partition);
+ }
try {
broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 5494250..f6eb123 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -61,7 +61,6 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -158,9 +157,6 @@
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
- if (appCtx.isCloudDeployment()) {
- doMetadataRecovery();
- }
LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -175,11 +171,6 @@
replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
}
- public synchronized void doMetadataRecovery() {
- LOGGER.info("starting recovery for metadata partition {}", StorageConstants.METADATA_PARTITION);
- appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
- }
-
public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
boolean closeOnFlushRedo) throws IOException, ACIDException {
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
index 0071ee0..435419b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -46,6 +46,10 @@
deleteInvalidMetadataIndexes(localResourceRepository);
final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions();
localResourceRepository.deleteCorruptedResources();
+ INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+ if (appCtx.isCloudDeployment() && nodePartitions.contains(metadataPartitionId)) {
+ appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
+ }
for (Integer partition : nodePartitions) {
localResourceRepository.cleanup(partition);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3f76a31..df2c25d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -112,12 +112,12 @@
LOGGER.info("Starting Global Recovery");
MetadataManager.INSTANCE.init();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ rollbackIncompleteAtomicTransactions(appCtx);
if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
}
mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
- rollbackIncompleteAtomicTransactions(appCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
recoveryCompleted = true;
recovering = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..60bcbb5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -91,6 +91,22 @@
}
/**
+ * Creates a dataset without type specification
+ * @param dataset The name of the dataset
+ * @param fields The fields composing the primary key
+ * @param pkAutogenerated Is the primary key autogenerated
+ * @throws Exception
+ */
+ public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated)
+ throws Exception {
+ StringBuilder stringBuilder = new StringBuilder("");
+ fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")"
+ + (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT);
+ }
+
+ /**
* Creates a secondary primary index
* @param dataset the name of the dataset
* @param indexName the name of the index
@@ -128,6 +144,22 @@
}
/**
+ * Creates a single insert statement with multiple records containing name field
+ * @param dataset The name of the dataset
+ * @param count Number of records in the insert statement
+ * @throws Exception
+ */
+ public static void insertBulkData(String dataset, long count) throws Exception {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < count; i++) {
+ stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+ }
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);",
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+
+ /**
* Gets the number of records in dataset {@code dataset}
*
* @param datasetName
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java
new file mode 100644
index 0000000..0d915de
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicMetadataTransactionWithoutWALTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class AtomicMetadataTransactionWithoutWALTest {
+ public static final String RESOURCES_PATH = joinPath(System.getProperty("user.dir"), "src", "test", "resources");
+ public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+ private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+ private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final String DATASET_NAME_PREFIX = "ds_";
+ private static final int NUM_DATASETS = 500;
+ private static final int NUM_RECOVERIES = 10;
+
+ @Before
+ public void setUp() throws Exception {
+ boolean cleanStart = Boolean.getBoolean("cleanup.start");
+ LocalCloudUtil.startS3CloudEnvironment(cleanStart);
+ integrationUtil.setGracefulShutdown(false);
+ integrationUtil.init(true, CONFIG_FILE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ private void createDatasets() throws Exception {
+ String datasetName;
+ for (int i = 0; i < NUM_DATASETS; i++) {
+ datasetName = DATASET_NAME_PREFIX + i;
+ TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+ }
+ }
+
+ @Test
+ public void testAtomicityWithFailures() throws Exception {
+ final String leftJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds LEFT JOIN Metadata.`Index` i "
+ + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND i.DatasetName IS MISSING;";
+ final String rightJoinQuery = "SELECT VALUE COUNT(*) FROM Metadata.`Dataset` ds RIGHT JOIN Metadata.`Index` i "
+ + "ON ds.DatasetName=i.DatasetName AND i.IsPrimary=true WHERE ds.DatasetId!=0 AND ds.DatasetName IS MISSING;";
+ for (int i = 0; i <= NUM_RECOVERIES; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ createDatasets();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ Random rnd = new Random();
+ Thread.sleep(rnd.nextInt(100) + 10);
+ integrationUtil.deinit(true);
+ integrationUtil.init(true, CONFIG_FILE);
+
+ Assert.assertEquals(0, runCountQuery(leftJoinQuery));
+ Assert.assertEquals(0, runCountQuery(rightJoinQuery));
+ }
+ }
+
+ private int runCountQuery(String query) throws Exception {
+ InputStream responseStream = TEST_EXECUTOR.executeQueryService(query,
+ TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT, StandardCharsets.UTF_8);
+ ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class);
+ JsonNode result = response.get("results");
+ Assert.assertEquals(1, result.size());
+ return result.get(0).asInt();
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
new file mode 100644
index 0000000..5219e011
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsTest {
+ private static final String TEST_CONFIG_FILE_NAME = "cc.conf";
+ private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "src"
+ + File.separator + "main" + File.separator + "resources";
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ private static final String DATASET_NAME_PREFIX = "ds_";
+ private static final int NUM_DATASETS = 5;
+ private static final int BATCH_SIZE = 100;
+ private static final int NUM_UPSERTS = 100;
+ private static final int NUM_RECOVERIES = 10;
+
+ @Before
+ public void setUp() throws Exception {
+ integrationUtil.setGracefulShutdown(false);
+ integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+ createDatasets();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ private void createDatasets() throws Exception {
+ String datasetName;
+ for (int i = 0; i < NUM_DATASETS; i++) {
+ datasetName = DATASET_NAME_PREFIX + i;
+ TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+ TestDataUtil.createSecondaryBTreeIndex(datasetName, datasetName + "_sidx", "name:string");
+ }
+ }
+
+ private Thread insertRecords(String dataset) {
+ Thread thread = new Thread(() -> {
+ try {
+ for (int i = 0; i < NUM_UPSERTS; i++) {
+ TestDataUtil.insertBulkData(dataset, BATCH_SIZE);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ return thread;
+ }
+
+ @Test
+ public void testAtomicityWithFailures() throws Exception {
+ for (int i = 0; i <= NUM_RECOVERIES; i++) {
+ List<Thread> threads = new ArrayList<>();
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ }
+ Random rnd = new Random();
+ Thread.sleep(rnd.nextInt(2000) + 500);
+ integrationUtil.deinit(false);
+ integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ final long countAfterRecovery = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+ Assert.assertEquals(0, countAfterRecovery % BATCH_SIZE);
+ }
+ }
+ }
+
+ @Test
+ public void testAtomicityWithoutFailures() throws Exception {
+ List<Thread> threads = new ArrayList<>();
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ long count = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+ Assert.assertEquals(2 * NUM_UPSERTS * BATCH_SIZE, count);
+ }
+
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 7e38b8b..34bc587 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -279,7 +279,7 @@
for (FlushOperation flush : lastFlushOperation.values()) {
FileReference target = flush.getTarget();
Map<String, Object> map = flush.getParameters();
- final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+ final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID);
final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index be935d0..014bc5c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.utils.StorageConstants;
@@ -403,8 +404,9 @@
: new SecondaryIndexOperationTrackerFactory(datasetId);
ILSMComponentIdGeneratorFactory idGeneratorProvider = new DatasetLSMComponentIdGeneratorFactory(datasetId);
DatasetInfoProvider datasetInfoProvider = new DatasetInfoProvider(datasetId);
- ILSMIOOperationCallbackFactory ioOpCallbackFactory =
- new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider);
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory = appContext.isCloudDeployment()
+ ? new AtomicLSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider)
+ : new LSMIndexIOOperationCallbackFactory(idGeneratorProvider, datasetInfoProvider);
ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new LSMIndexPageWriteCallbackFactory();
IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index f09248f..460f393 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -139,7 +139,8 @@
return lsmIndex -> {
if (lsmIndex.isPrimaryIndex()) {
PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
- return currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos;
+ return !lsmIndex.isAtomic()
+ && currentTime - opTracker.getLastFlushTime() >= datasetCheckpointIntervalNanos;
}
return false;
};
@@ -148,7 +149,7 @@
private Predicate<ILSMIndex> newLaggingDatasetPredicate(long checkpointTargetLSN) {
return lsmIndex -> {
final LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback();
- return ioCallback.getPersistenceLsn() < checkpointTargetLSN;
+ return !lsmIndex.isAtomic() && ioCallback.getPersistenceLsn() < checkpointTargetLSN;
};
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
index aa698be..151d9ef 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicNoWALTransactionContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import static org.apache.hyracks.util.ExitUtil.EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT;
+
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
@@ -44,14 +46,19 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
@ThreadSafe
public class AtomicNoWALTransactionContext extends AtomicTransactionContext {
+ private static final Logger LOGGER = LogManager.getLogger();
private final INcApplicationContext appCtx;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -111,9 +118,13 @@
}
try {
commit();
- } catch (Exception e) {
- rollback(resourceMap);
- throw new ACIDException(e);
+ } catch (HyracksDataException e) {
+ try {
+ rollback(resourceMap);
+ } catch (Exception ex) {
+ LOGGER.error("Error while rolling back atomic statement for {}, halting JVM", txnId);
+ ExitUtil.halt(EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT);
+ }
} finally {
deleteLogFile();
}
@@ -122,17 +133,25 @@
private void persistLogFile(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap)
throws HyracksDataException, JsonProcessingException {
- IIOManager ioManager = appCtx.getIoManager();
+ IIOManager ioManager = appCtx.getPersistenceIoManager();
FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
String.format("%s.log", txnId)).toString());
- MetadataAtomicTransactionLog txnLog = new MetadataAtomicTransactionLog(txnId, datasetIds,
- appCtx.getServiceContext().getNodeId(), resourceMap);
- ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(txnLog).getBytes());
+ ioManager.overwrite(fref, OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
+ .writeValueAsString(toJson(datasetIds, resourceMap)).getBytes());
+ }
+
+ private ObjectNode toJson(List<Integer> datasetIds, Map<String, ILSMComponentId> resourceMap) {
+ ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
+ jsonNode.put("txnId", txnId.getId());
+ jsonNode.putPOJO("datasetIds", datasetIds);
+ jsonNode.put("nodeId", appCtx.getServiceContext().getNodeId());
+ jsonNode.putPOJO("resourceMap", resourceMap);
+ return jsonNode;
}
public void deleteLogFile() {
- IIOManager ioManager = appCtx.getIoManager();
+ IIOManager ioManager = appCtx.getPersistenceIoManager();
try {
FileReference fref = ioManager.resolve(Paths.get(StorageConstants.METADATA_TXN_NOWAL_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX + StorageConstants.METADATA_PARTITION,
@@ -172,10 +191,13 @@
datasetLifecycleManager.getIndexCheckpointManagerProvider();
resourceMap.forEach((k, v) -> {
try {
- IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ IIndexCheckpointManager checkpointManager =
+ indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
if (checkpointManager.getCheckpointCount() > 0) {
IndexCheckpoint checkpoint = checkpointManager.getLatest();
if (checkpoint.getLastComponentId() == v.getMaxId()) {
+ LOGGER.info("Removing checkpoint for resource {} for component id {}", k,
+ checkpoint.getLastComponentId());
checkpointManager.deleteLatest(v.getMaxId(), 1);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
deleted file mode 100644
index 7b3af3f..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MetadataAtomicTransactionLog.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.transactions.TxnId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MetadataAtomicTransactionLog {
-
- private TxnId txnId;
- private List<Integer> datasetIds;
- private String nodeId;
- private Map<String, ILSMComponentId> resourceMap;
-
- @JsonCreator
- public MetadataAtomicTransactionLog(@JsonProperty("txnId") TxnId txnId,
- @JsonProperty("datasetIds") List<Integer> datasetIds, @JsonProperty("nodeId") String nodeId,
- @JsonProperty("resourceMap") Map<String, ILSMComponentId> resourceMap) {
- this.txnId = txnId;
- this.datasetIds = datasetIds;
- this.nodeId = nodeId;
- this.resourceMap = resourceMap;
- }
-
- public TxnId getTxnId() {
- return txnId;
- }
-
- public List<Integer> getDatasetIds() {
- return datasetIds;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public Map<String, ILSMComponentId> getResourceMap() {
- return resourceMap;
- }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index ea7fcb8..4c7f53b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,12 +43,16 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
@ThreadSafe
public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
@@ -196,7 +202,7 @@
@Override
public void rollbackMetadataTransactionsWithoutWAL() {
- IIOManager ioManager = txnSubsystem.getApplicationContext().getIoManager();
+ IIOManager ioManager = txnSubsystem.getApplicationContext().getPersistenceIoManager();
try {
Set<FileReference> txnLogFileRefs =
ioManager.list(ioManager.resolve(Paths
@@ -205,15 +211,30 @@
.toString()));
ObjectMapper objectMapper = new ObjectMapper();
for (FileReference txnLogFileRef : txnLogFileRefs) {
- MetadataAtomicTransactionLog atomicTransactionLog = objectMapper.readValue(
- new String(ioManager.readAllBytes(txnLogFileRef)), MetadataAtomicTransactionLog.class);
- AtomicNoWALTransactionContext context = new AtomicNoWALTransactionContext(
- atomicTransactionLog.getTxnId(), txnSubsystem.getApplicationContext());
- context.rollback(atomicTransactionLog.getResourceMap());
+ ObjectNode atomicTransactionLog =
+ objectMapper.readValue(new String(ioManager.readAllBytes(txnLogFileRef)), ObjectNode.class);
+ TxnId txnId = new TxnId(atomicTransactionLog.get("txnId").asInt());
+ JsonNode jsonNode = atomicTransactionLog.get("resourceMap");
+ Map<String, ILSMComponentId> resourceMap = getResourceMapFromJson(jsonNode);
+ AtomicNoWALTransactionContext context =
+ new AtomicNoWALTransactionContext(txnId, txnSubsystem.getApplicationContext());
+ context.rollback(resourceMap);
context.deleteLogFile();
}
} catch (Exception e) {
throw new ACIDException(e);
}
}
+
+ private Map<String, ILSMComponentId> getResourceMapFromJson(JsonNode jsonNode) {
+ Map<String, ILSMComponentId> resourceMap = new HashMap<>();
+ for (Iterator<String> it = jsonNode.fieldNames(); it.hasNext();) {
+ String resourcePath = it.next();
+ JsonNode componentIdNode = jsonNode.get(resourcePath);
+ ILSMComponentId componentId =
+ new LSMComponentId(componentIdNode.get("minId").asLong(), componentIdNode.get("maxId").asLong());
+ resourceMap.put(resourcePath, componentId);
+ }
+ return resourceMap;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 6904e3c..18825da 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -89,7 +89,9 @@
STRING,
appConfig -> FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR),
"global-txn-log"),
- ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log");
+ ControllerConfig.Option.DEFAULT_DIR.cmdline() + "/global-txn-log"),
+ GLOBAL_TXN_COMMIT_TIMEOUT(LONG, 600000L),
+ GLOBAL_TXN_ROLLBACK_TIMEOUT(LONG, 600000L);
private final IOptionType parser;
private Object defaultValue;
@@ -213,6 +215,10 @@
return "Path to HTTP basic credentials";
case GLOBAL_TXN_LOG_DIR:
return "Directory to store global transaction logs";
+ case GLOBAL_TXN_COMMIT_TIMEOUT:
+ return "Timeout for Commit";
+ case GLOBAL_TXN_ROLLBACK_TIMEOUT:
+ return "Timeout for Rollback";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -492,4 +498,11 @@
return getAppConfig().getString(Option.GLOBAL_TXN_LOG_DIR);
}
+ public long getGlobalTxCommitTimeout() {
+ return getAppConfig().getLong(Option.GLOBAL_TXN_COMMIT_TIMEOUT);
+ }
+
+ public long getGlobalTxRollbackTimeout() {
+ return getAppConfig().getLong(Option.GLOBAL_TXN_ROLLBACK_TIMEOUT);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 8f8e8f6..0f03562 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -58,6 +58,7 @@
public static final int EC_ACTIVE_RECOVERY_FAILURE = 20;
public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
public static final int EC_INCONSISTENT_STORAGE_REFERENCES = 23;
+ public static final int EC_FAILED_TO_ROLLBACK_ATOMIC_STATEMENT = 24;
public static final int EC_IMMEDIATE_HALT = 33;
public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;
public static final int EC_IO_SCHEDULER_FAILED = 55;