[ASTERIXDB-1464][TX] Handle Interrupts in LogManager
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Properly handle interrupts on log page
and log file switch in LogManager.
- Propagate interrupts on transactor threads.
- Ignore interrupts while waiting for txn
commit/abort log to be flushed.
- Add test case for handling interrupts in
LogManager.
- Add test case for interrupting a txn waiting
for commit log.
Change-Id: I6936a60dc572e07f01baabc3f8af3bf88a58f661
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2248
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index de0c88f..8552a1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3059,6 +3059,7 @@
* @param mdTxnCtx
*/
public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
+ boolean interrupted = Thread.interrupted();
try {
if (IS_DEBUG_MODE) {
LOGGER.log(Level.ERROR, rootE.getMessage(), rootE);
@@ -3069,6 +3070,10 @@
} catch (Exception e2) {
parentE.addSuppressed(e2);
throw new IllegalStateException(rootE);
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index f6a4aac..1a14864 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -32,6 +32,7 @@
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
@@ -173,9 +174,16 @@
try {
work.run();
done = true;
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e);
- interruptedException = e;
+ } catch (Exception e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ if (rootCause instanceof java.lang.InterruptedException) {
+ interruptedException = (InterruptedException) rootCause;
+ // clear the interrupted state from the thread
+ Thread.interrupted();
+ LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e);
+ continue;
+ }
+ throw e;
}
} while (!done);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 6cf1940..128aee6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common;
import java.io.InputStream;
+import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -28,11 +29,18 @@
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.utils.RebalanceUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileSplit;
import org.junit.Assert;
import com.fasterxml.jackson.databind.JsonNode;
@@ -158,4 +166,52 @@
metadataProvider.getLocks().unlock();
}
}
+
+ /**
+ * Gets the reference of dataset {@code dataset} from metadata
+ *
+ * @param integrationUtil
+ * @param datasetName
+ * @return the dataset reference if found. Otherwise null.
+ * @throws AlgebricksException
+ * @throws RemoteException
+ */
+ public static Dataset getDataset(AsterixHyracksIntegrationUtil integrationUtil, String datasetName)
+ throws AlgebricksException, RemoteException {
+ final ICcApplicationContext appCtx =
+ (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+ final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset dataset;
+ try {
+ dataset = metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, datasetName);
+ } finally {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ metadataProvider.getLocks().unlock();
+ }
+ return dataset;
+ }
+
+ /**
+ * Gets the file splits of {@code dataset}
+ *
+ * @param integrationUtil
+ * @param dataset
+ * @return the file splits of the dataset
+ * @throws RemoteException
+ * @throws AlgebricksException
+ */
+ public static FileSplit[] getDatasetSplits(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset)
+ throws RemoteException, AlgebricksException {
+ final ICcApplicationContext ccAppCtx =
+ (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ return SplitsAndConstraintsUtil
+ .getIndexSplits(dataset, dataset.getDatasetName(), mdTxnCtx, ccAppCtx.getClusterStateManager());
+ } finally {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 1d31bc0..decee99 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -22,9 +22,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -213,6 +210,38 @@
}
}
+ @Test
+ public void surviveInterruptOnMetadataTxnCommit() throws Exception {
+ ICcApplicationContext appCtx =
+ (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+ final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+ final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxn);
+ final String nodeGroupName = "ng";
+ Thread transactor = new Thread(() -> {
+ final List<String> ngNodes = Arrays.asList("asterix_nc1");
+ try {
+ MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes));
+ Thread.currentThread().interrupt();
+ MetadataManager.INSTANCE.commitTransaction(mdTxn);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ transactor.start();
+ transactor.join();
+ // ensure that the node group was added
+ final MetadataTransactionContext readMdTxn = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ final NodeGroup nodegroup = MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+ if (nodegroup == null) {
+ throw new AssertionError("nodegroup was found after metadata txn was aborted");
+ }
+ } finally {
+ MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+ }
+ }
+
private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort)
throws Exception {
Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
new file mode 100644
index 0000000..f43f3ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.txn;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogManagerTest {
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+ private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile";
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void interruptedLogPageSwitch() throws Exception {
+ final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+
+ final String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+ final String indexPath = getIndexPath(dataset, nodeId);
+ final IDatasetLifecycleManager dclm = ncAppCtx.getDatasetLifecycleManager();
+ dclm.open(indexPath);
+ final ILSMIndex index = (ILSMIndex) dclm.get(indexPath);
+ final long resourceId = ncAppCtx.getLocalResourceRepository().get(indexPath).getId();
+ final DatasetLocalResource datasetLocalResource =
+ (DatasetLocalResource) ncAppCtx.getLocalResourceRepository().get(indexPath).getResource();
+ final ITransactionContext txnCtx = beingTransaction(ncAppCtx, index, resourceId);
+ final ILogManager logManager = ncAppCtx.getTransactionSubsystem().getLogManager();
+ final ILockManager lockManager = ncAppCtx.getTransactionSubsystem().getLockManager();
+ final DatasetId datasetId = new DatasetId(dataset.getDatasetId());
+ final int[] pkFields = dataset.getPrimaryBloomFilterFields();
+ final int fieldsLength = pkFields.length;
+ final TestTupleReference tuple = new TestTupleReference(fieldsLength);
+ tuple.getFields()[0].getDataOutput().write(1);
+ final int partition = datasetLocalResource.getPartition();
+
+ // ensure interrupted thread will be interrupted on allocating next log page
+ final AtomicBoolean interrupted = new AtomicBoolean(false);
+ Thread interruptedTransactor = new Thread(() -> {
+ Thread.currentThread().interrupt();
+ try {
+ for (int i = 0; i < 10000; i++) {
+ lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx);
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields,
+ partition, LogType.ENTITY_COMMIT);
+ logManager.log(logRecord);
+ }
+ } catch (ACIDException e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ if (rootCause instanceof java.lang.InterruptedException) {
+ interrupted.set(true);
+ }
+ }
+ });
+ interruptedTransactor.start();
+ interruptedTransactor.join();
+ Assert.assertTrue(interrupted.get());
+
+ // ensure next thread will be able to allocate next page
+ final AtomicInteger failCount = new AtomicInteger(0);
+ Thread transactor = new Thread(() -> {
+ try {
+ for (int i = 0; i < 10000; i++) {
+ lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx);
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields,
+ partition, LogType.ENTITY_COMMIT);
+ logManager.log(logRecord);
+ }
+ } catch (Exception e) {
+ failCount.incrementAndGet();
+ }
+ });
+ transactor.start();
+ transactor.join();
+ Assert.assertEquals(0, failCount.get());
+ }
+
+ @Test
+ public void interruptedLogFileSwitch() throws Exception {
+ final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
+ int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+
+ // ensure an interrupted transactor will create next log file but will fail to position the log channel
+ final AtomicBoolean interrupted = new AtomicBoolean(false);
+ Thread interruptedTransactor = new Thread(() -> {
+ Thread.currentThread().interrupt();
+ try {
+ prepareNextLogFile(logManager);
+ } catch (Exception e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException) {
+ interrupted.set(true);
+ }
+ }
+ });
+ interruptedTransactor.start();
+ interruptedTransactor.join();
+ // ensure a new log file was created but the thread was interrupt
+ int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+ Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
+ Assert.assertTrue(interrupted.get());
+
+ // ensure next transactor will not create another file
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Thread transactor = new Thread(() -> {
+ try {
+ prepareNextLogFile(logManager);
+ } catch (Exception e) {
+ failed.set(true);
+ }
+ });
+ transactor.start();
+ transactor.join();
+ // make sure no new files were created and the operation was successful
+ int countAfterTransactor = logManager.getLogFileIds().size();
+ Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor);
+ Assert.assertFalse(failed.get());
+
+ // make sure we can still log to the new file
+ interruptedLogPageSwitch();
+ }
+
+ private static String getIndexPath(Dataset dataset, String nodeId) throws Exception {
+ final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset);
+ final Optional<FileSplit> nodeFileSplit =
+ Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst();
+ Assert.assertTrue(nodeFileSplit.isPresent());
+ return nodeFileSplit.get().getPath();
+ }
+
+ private static ITransactionContext beingTransaction(INcApplicationContext ncAppCtx, ILSMIndex index,
+ long resourceId) {
+ final TxnId txnId = new TxnId(1);
+ final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
+ final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager();
+ final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options);
+ txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true);
+ return txnCtx;
+ }
+
+ private static void prepareNextLogFile(LogManager logManager) throws Exception {
+ Method method;
+ try {
+ method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was it renamed?");
+ }
+ method.setAccessible(true);
+ method.invoke(logManager, null);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
index d17fad7..08eee10 100644
--- a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
+++ b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
@@ -27,5 +27,11 @@
<Logger name="org.apache.asterix.test" level="WARN">
<AppenderRef ref="Console"/>
</Logger>
+ <Logger name="org.apache.asterix.transaction.management.service.logging.LogFlusher" level="INFO">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.asterix.utils.RebalanceUtil" level="INFO">
+ <AppenderRef ref="Console"/>
+ </Logger>
</Loggers>
</Configuration>
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index dcf8250..b67da80 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -125,14 +125,6 @@
this.fileChannel = fileChannel;
}
- public void setInitialFlushOffset(long offset) {
- try {
- fileChannel.position(offset);
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
@Override
public synchronized void setFull() {
this.full.set(true);
@@ -146,7 +138,7 @@
@Override
public boolean hasSpace(int logSize) {
- return appendOffset + logSize <= logPageSize;
+ return appendOffset + logSize <= logPageSize && !full.get();
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 4ce9c71..833f8f6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -24,6 +24,9 @@
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -46,6 +49,7 @@
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogManagerProperties;
+import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.MutableLong;
import org.apache.asterix.common.transactions.TxnLogFile;
@@ -75,9 +79,9 @@
private final String nodeId;
private final FlushLogsLogger flushLogsLogger;
private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
- protected final long logFileSize;
- protected final int logPageSize;
- protected final AtomicLong appendLSN;
+ private final long logFileSize;
+ private final int logPageSize;
+ private final AtomicLong appendLSN;
/*
* Mutables
*/
@@ -85,7 +89,7 @@
private LinkedBlockingQueue<ILogBuffer> flushQ;
private LinkedBlockingQueue<ILogBuffer> stashQ;
private FileChannel appendChannel;
- protected ILogBuffer appendPage;
+ private ILogBuffer appendPage;
private LogFlusher logFlusher;
private Future<? extends Object> futureLogFlusher;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
@@ -112,15 +116,19 @@
flushQ = new LinkedBlockingQueue<>(numLogPages);
stashQ = new LinkedBlockingQueue<>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
+ emptyQ.add(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
}
appendLSN.set(initializeLogAnchor(nextLogFileId));
flushLSN.set(appendLSN.get());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
}
- appendChannel = getFileChannel(appendLSN.get(), false);
- getAndInitNewPage(INITIAL_LOG_SIZE);
+ try {
+ setLogPosition(appendLSN.get());
+ } catch (IOException e) {
+ throw new ACIDException(e);
+ }
+ initNewPage(INITIAL_LOG_SIZE);
logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ);
futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
if (!flushLogsLogger.isAlive()) {
@@ -129,55 +137,43 @@
}
@Override
- public void log(ILogRecord logRecord) throws ACIDException {
+ public void log(ILogRecord logRecord) {
if (logRecord.getLogType() == LogType.FLUSH) {
- flushLogsQ.offer(logRecord);
+ flushLogsQ.add(logRecord);
return;
}
appendToLogTail(logRecord);
}
- protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+ protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
-
- if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
- synchronized (logRecord) {
- while (!logRecord.isFlushed()) {
- try {
+ if (waitForFlush(logRecord) && !logRecord.isFlushed()) {
+ InvokeUtil.doUninterruptibly(() -> {
+ synchronized (logRecord) {
+ while (!logRecord.isFlushed()) {
logRecord.wait();
- } catch (InterruptedException e) {
- //ignore
}
}
- }
+ });
}
}
- protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
- if (logRecord.getLogType() != LogType.FLUSH) {
+ protected static boolean waitForFlush(ILogRecord logRecord) {
+ final byte logType = logRecord.getLogType();
+ return logType == LogType.JOB_COMMIT || logType == LogType.ABORT || logType == LogType.WAIT;
+ }
+
+ synchronized void syncAppendToLogTail(ILogRecord logRecord) {
+ if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
ITransactionContext txnCtx = logRecord.getTxnCtx();
if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
throw new ACIDException(
"Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record.");
}
}
-
- /*
- * To eliminate the case where the modulo of the next appendLSN = 0 (the next
- * appendLSN = the first LSN of the next log file), we do not allow a log to be
- * written at the last offset of the current file.
- */
final int logSize = logRecord.getLogSize();
- // Make sure the log will not exceed the log file size
- if (getLogFileOffset(appendLSN.get()) + logSize >= logFileSize) {
- prepareNextLogFile();
- prepareNextPage(logSize);
- } else if (!appendPage.hasSpace(logSize)) {
- prepareNextPage(logSize);
- }
+ ensureSpace(logSize);
appendPage.append(logRecord, appendLSN.get());
-
if (logRecord.getLogType() == LogType.FLUSH) {
logRecord.setLSN(appendLSN.get());
}
@@ -187,70 +183,98 @@
appendLSN.addAndGet(logSize);
}
- protected void prepareNextPage(int logSize) {
- appendPage.setFull();
- getAndInitNewPage(logSize);
- }
-
- protected void getAndInitNewPage(int logSize) {
- if (logSize > logPageSize) {
- // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity
- appendPage = null;
- while (appendPage == null) {
- try {
- appendPage = emptyQ.take();
- stashQ.add(appendPage);
- } catch (InterruptedException e) {
- //ignore
- }
- }
- // for now, alloc a new buffer for each large page
- // TODO: pool large pages??
- appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
- appendPage.setFileChannel(appendChannel);
- flushQ.offer(appendPage);
- } else {
- appendPage = null;
- while (appendPage == null) {
- try {
- appendPage = emptyQ.take();
- } catch (InterruptedException e) {
- //ignore
- }
- }
- appendPage.reset();
- appendPage.setFileChannel(appendChannel);
- flushQ.offer(appendPage);
+ private void ensureSpace(int logSize) {
+ if (!fileHasSpace(logSize)) {
+ ensureLastPageFlushed();
+ prepareNextLogFile();
+ }
+ if (!appendPage.hasSpace(logSize)) {
+ prepareNextPage(logSize);
}
}
- protected void prepareNextLogFile() {
+ private boolean fileHasSpace(int logSize) {
+ /*
+ * To eliminate the case where the modulo of the next appendLSN = 0 (the next
+ * appendLSN = the first LSN of the next log file), we do not allow a log to be
+ * written at the last offset of the current file.
+ */
+ return getLogFileOffset(appendLSN.get()) + logSize < logFileSize;
+ }
+
+ private void prepareNextPage(int logSize) {
+ appendPage.setFull();
+ initNewPage(logSize);
+ }
+
+ private void initNewPage(int logSize) {
+ boolean largePage = logSize > logPageSize;
+ // if a new large page will be allocated, we need to stash a normal sized page
+ // since our queues have fixed capacity
+ ensureAvailablePage(largePage);
+ if (largePage) {
+ // for now, alloc a new buffer for each large page
+ // TODO: pool large pages??
+ appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
+ } else {
+ appendPage.reset();
+ }
+ appendPage.setFileChannel(appendChannel);
+ flushQ.add(appendPage);
+ }
+
+ private void ensureAvailablePage(boolean stash) {
+ final ILogBuffer currentPage = appendPage;
+ appendPage = null;
+ try {
+ appendPage = emptyQ.take();
+ if (stash) {
+ stashQ.add(appendPage);
+ }
+ } catch (InterruptedException e) {
+ appendPage = currentPage;
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
+ }
+ }
+
+ private void prepareNextLogFile() {
+ final long nextFileBeginLsn = getNextFileFirstLsn();
+ try {
+ createNextLogFile();
+ setLogPosition(nextFileBeginLsn);
+ } catch (IOException e) {
+ throw new ACIDException(e);
+ }
+ // move appendLSN and flushLSN to the first LSN of the next log file
+ // only after the file was created and the channel was positioned successfully
+ appendLSN.set(nextFileBeginLsn);
+ flushLSN.set(nextFileBeginLsn);
+ LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn),
+ nextFileBeginLsn);
+ }
+
+ private long getNextFileFirstLsn() {
+ // add the remaining space in the current file
+ return appendLSN.get() + (logFileSize - getLogFileOffset(appendLSN.get()));
+ }
+
+ private void ensureLastPageFlushed() {
// Mark the page as the last page so that it will close the output file channel.
appendPage.setLastPage();
// Make sure to flush whatever left in the log tail.
appendPage.setFull();
- //wait until all log records have been flushed in the current file
synchronized (flushLSN) {
- try {
- while (flushLSN.get() != appendLSN.get()) {
- //notification will come from LogBuffer.internalFlush(.)
+ while (flushLSN.get() != appendLSN.get()) {
+ // notification will come from LogBuffer.internalFlush(.)
+ try {
flushLSN.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
}
- //move appendLSN and flushLSN to the first LSN of the next log file
- appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
- flushLSN.set(appendLSN.get());
- appendChannel = getFileChannel(appendLSN.get(), true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get()) + ") starting with LSN = "
- + appendLSN.get());
- }
- //[Notice]
- //the current log file channel is closed if
- //LogBuffer.flush() completely flush the last page of the file.
}
@Override
@@ -386,8 +410,8 @@
* The log file which contains the checkpointLSN has been reached.
* The oldest log file being accessed by a LogReader has been reached.
*/
- if (id >= checkpointLSNLogFileID
- || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
+ if (id >= checkpointLSNLogFileID || (txnLogFileId2ReaderCount.containsKey(id)
+ && txnLogFileId2ReaderCount.get(id) > 0)) {
break;
}
@@ -475,11 +499,11 @@
return logFileIds;
}
- public String getLogFilePath(long fileId) {
+ private String getLogFilePath(long fileId) {
return logDir + File.separator + logFilePrefix + "_" + fileId;
}
- public long getLogFileOffset(long lsn) {
+ private long getLogFileOffset(long lsn) {
return lsn % logFileSize;
}
@@ -500,28 +524,24 @@
return (new File(path)).mkdir();
}
- private FileChannel getFileChannel(long lsn, boolean create) {
- FileChannel newFileChannel = null;
- try {
- long fileId = getLogFileId(lsn);
- String logFilePath = getLogFilePath(fileId);
- File file = new File(logFilePath);
- if (create) {
- if (!file.createNewFile()) {
- throw new IllegalStateException();
- }
- } else {
- if (!file.exists()) {
- throw new IllegalStateException();
- }
- }
- RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "rw");
- newFileChannel = raf.getChannel();
- newFileChannel.position(getLogFileOffset(lsn));
- } catch (IOException e) {
- throw new IllegalStateException(e);
+ private void createNextLogFile() throws IOException {
+ final long nextFileBeginLsn = getNextFileFirstLsn();
+ final long fileId = getLogFileId(nextFileBeginLsn);
+ final Path nextFilePath = Paths.get(getLogFilePath(fileId));
+ if (nextFilePath.toFile().exists()) {
+ LOGGER.warn("Ignored create log file {} since file already exists", nextFilePath.toString());
+ return;
}
- return newFileChannel;
+ Files.createFile(nextFilePath);
+ }
+
+ private void setLogPosition(long lsn) throws IOException {
+ final long fileId = getLogFileId(lsn);
+ final Path targetFilePath = Paths.get(getLogFilePath(fileId));
+ final long targetPosition = getLogFileOffset(lsn);
+ final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed by LogBuffer
+ appendChannel = raf.getChannel();
+ appendChannel.position(targetPosition);
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 99f365a..9b6a4f9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -25,11 +25,10 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.InvokeUtil;
public class LogManagerWithReplication extends LogManager {
@@ -42,7 +41,7 @@
}
@Override
- public void log(ILogRecord logRecord) throws ACIDException {
+ public void log(ILogRecord logRecord) {
boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
if (shouldReplicate) {
switch (logRecord.getLogType()) {
@@ -66,7 +65,7 @@
//Remote flush logs do not need to be flushed separately since they may not trigger local flush
if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
- flushLogsQ.offer(logRecord);
+ flushLogsQ.add(logRecord);
return;
}
@@ -74,7 +73,7 @@
}
@Override
- protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+ protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
if (logRecord.isReplicated()) {
@@ -82,68 +81,31 @@
replicationManager.replicateLog(logRecord);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ throw new ACIDException(e);
}
}
- if (logRecord.getLogSource() == LogSource.LOCAL) {
- if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
+ if (logRecord.getLogSource() == LogSource.LOCAL && waitForFlush(logRecord) && !logRecord.isFlushed()) {
+ InvokeUtil.doUninterruptibly(() -> {
synchronized (logRecord) {
while (!logRecord.isFlushed()) {
- try {
- logRecord.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ logRecord.wait();
}
-
//wait for job Commit/Abort ACK from replicas
if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT)) {
while (!replicationManager.hasBeenReplicated(logRecord)) {
- try {
- logRecord.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ logRecord.wait();
}
}
}
- }
+ });
}
}
@Override
- protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
- ITransactionContext txnCtx = logRecord.getTxnCtx();
- if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
- throw new ACIDException(
- "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record.");
- }
- }
-
- final int logRecordSize = logRecord.getLogSize();
- // Make sure the log will not exceed the log file size
- if (getLogFileOffset(appendLSN.get()) + logRecordSize >= logFileSize) {
- prepareNextLogFile();
- prepareNextPage(logRecordSize);
- } else if (!appendPage.hasSpace(logRecordSize)) {
- prepareNextPage(logRecordSize);
- }
- appendPage.append(logRecord, appendLSN.get());
-
- if (logRecord.getLogType() == LogType.FLUSH) {
- logRecord.setLSN(appendLSN.get());
- }
-
- appendLSN.addAndGet(logRecordSize);
- }
-
- @Override
public void setReplicationManager(IReplicationManager replicationManager) {
this.replicationManager = replicationManager;
this.replicationStrategy = replicationManager.getReplicationStrategy();
}
-
}