[ASTERIXDB-1464][TX] Handle Interrupts in LogManager

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Properly handle interrupts on log page
  and log file switch in LogManager.
- Propagate interrupts on transactor threads.
- Ignore interrupts while waiting for txn
  commit/abort log to be flushed.
- Add test case for handling interrupts in
  LogManager.
- Add test case for interrupting a txn waiting
  for commit log.

Change-Id: I6936a60dc572e07f01baabc3f8af3bf88a58f661
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2248
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index de0c88f..8552a1c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3059,6 +3059,7 @@
      * @param mdTxnCtx
      */
     public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
+        boolean interrupted = Thread.interrupted();
         try {
             if (IS_DEBUG_MODE) {
                 LOGGER.log(Level.ERROR, rootE.getMessage(), rootE);
@@ -3069,6 +3070,10 @@
         } catch (Exception e2) {
             parentE.addSuppressed(e2);
             throw new IllegalStateException(rootE);
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index f6a4aac..1a14864 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
@@ -173,9 +174,16 @@
             try {
                 work.run();
                 done = true;
-            } catch (InterruptedException e) {
-                LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e);
-                interruptedException = e;
+            } catch (Exception e) {
+                Throwable rootCause = ExceptionUtils.getRootCause(e);
+                if (rootCause instanceof java.lang.InterruptedException) {
+                    interruptedException = (InterruptedException) rootCause;
+                    // clear the interrupted state from the thread
+                    Thread.interrupted();
+                    LOGGER.log(Level.WARN, "Retry with attempt " + (++retryCount), e);
+                    continue;
+                }
+                throw e;
             }
         } while (!done);
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 6cf1940..128aee6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common;
 
 import java.io.InputStream;
+import java.rmi.RemoteException;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -28,11 +29,18 @@
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.asterix.utils.RebalanceUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileSplit;
 import org.junit.Assert;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -158,4 +166,52 @@
             metadataProvider.getLocks().unlock();
         }
     }
+
+    /**
+     * Gets the reference of dataset {@code dataset} from metadata
+     *
+     * @param integrationUtil
+     * @param datasetName
+     * @return the dataset reference if found. Otherwise null.
+     * @throws AlgebricksException
+     * @throws RemoteException
+     */
+    public static Dataset getDataset(AsterixHyracksIntegrationUtil integrationUtil, String datasetName)
+            throws AlgebricksException, RemoteException {
+        final ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Dataset dataset;
+        try {
+            dataset = metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, datasetName);
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            metadataProvider.getLocks().unlock();
+        }
+        return dataset;
+    }
+
+    /**
+     * Gets the file splits of {@code dataset}
+     *
+     * @param integrationUtil
+     * @param dataset
+     * @return the file splits of the dataset
+     * @throws RemoteException
+     * @throws AlgebricksException
+     */
+    public static FileSplit[] getDatasetSplits(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset)
+            throws RemoteException, AlgebricksException {
+        final ICcApplicationContext ccAppCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        try {
+            return SplitsAndConstraintsUtil
+                    .getIndexSplits(dataset, dataset.getDatasetName(), mdTxnCtx, ccAppCtx.getClusterStateManager());
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 1d31bc0..decee99 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -22,9 +22,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -213,6 +210,38 @@
         }
     }
 
+    @Test
+    public void surviveInterruptOnMetadataTxnCommit() throws Exception {
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        final MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxn = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxn);
+        final String nodeGroupName = "ng";
+        Thread transactor = new Thread(() -> {
+            final List<String> ngNodes = Arrays.asList("asterix_nc1");
+            try {
+                MetadataManager.INSTANCE.addNodegroup(mdTxn, new NodeGroup(nodeGroupName, ngNodes));
+                Thread.currentThread().interrupt();
+                MetadataManager.INSTANCE.commitTransaction(mdTxn);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        transactor.start();
+        transactor.join();
+        // ensure that the node group was added
+        final MetadataTransactionContext readMdTxn = MetadataManager.INSTANCE.beginTransaction();
+        try {
+            final NodeGroup nodegroup = MetadataManager.INSTANCE.getNodegroup(readMdTxn, nodeGroupName);
+            if (nodegroup == null) {
+                throw new AssertionError("nodegroup was found after metadata txn was aborted");
+            }
+        } finally {
+            MetadataManager.INSTANCE.commitTransaction(readMdTxn);
+        }
+    }
+
     private void addDataset(ICcApplicationContext appCtx, Dataset source, int datasetPostfix, boolean abort)
             throws Exception {
         Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + datasetPostfix, source.getDataverseName(),
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
new file mode 100644
index 0000000..f43f3ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.txn;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogManagerTest {
+
+    protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+    private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile";
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+        integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void interruptedLogPageSwitch() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+        final String indexPath = getIndexPath(dataset, nodeId);
+        final IDatasetLifecycleManager dclm = ncAppCtx.getDatasetLifecycleManager();
+        dclm.open(indexPath);
+        final ILSMIndex index = (ILSMIndex) dclm.get(indexPath);
+        final long resourceId = ncAppCtx.getLocalResourceRepository().get(indexPath).getId();
+        final DatasetLocalResource datasetLocalResource =
+                (DatasetLocalResource) ncAppCtx.getLocalResourceRepository().get(indexPath).getResource();
+        final ITransactionContext txnCtx = beingTransaction(ncAppCtx, index, resourceId);
+        final ILogManager logManager = ncAppCtx.getTransactionSubsystem().getLogManager();
+        final ILockManager lockManager = ncAppCtx.getTransactionSubsystem().getLockManager();
+        final DatasetId datasetId = new DatasetId(dataset.getDatasetId());
+        final int[] pkFields = dataset.getPrimaryBloomFilterFields();
+        final int fieldsLength = pkFields.length;
+        final TestTupleReference tuple = new TestTupleReference(fieldsLength);
+        tuple.getFields()[0].getDataOutput().write(1);
+        final int partition = datasetLocalResource.getPartition();
+
+        // ensure interrupted thread will be interrupted on allocating next log page
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        Thread interruptedTransactor = new Thread(() -> {
+            Thread.currentThread().interrupt();
+            try {
+                for (int i = 0; i < 10000; i++) {
+                    lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx);
+                    LogRecord logRecord = new LogRecord();
+                    TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields,
+                            partition, LogType.ENTITY_COMMIT);
+                    logManager.log(logRecord);
+                }
+            } catch (ACIDException e) {
+                Throwable rootCause = ExceptionUtils.getRootCause(e);
+                if (rootCause instanceof java.lang.InterruptedException) {
+                    interrupted.set(true);
+                }
+            }
+        });
+        interruptedTransactor.start();
+        interruptedTransactor.join();
+        Assert.assertTrue(interrupted.get());
+
+        // ensure next thread will be able to allocate next page
+        final AtomicInteger failCount = new AtomicInteger(0);
+        Thread transactor = new Thread(() -> {
+            try {
+                for (int i = 0; i < 10000; i++) {
+                    lockManager.lock(datasetId, i, LockManagerConstants.LockMode.S, txnCtx);
+                    LogRecord logRecord = new LogRecord();
+                    TransactionUtil.formEntityCommitLogRecord(logRecord, txnCtx, datasetId.getId(), i, tuple, pkFields,
+                            partition, LogType.ENTITY_COMMIT);
+                    logManager.log(logRecord);
+                }
+            } catch (Exception e) {
+                failCount.incrementAndGet();
+            }
+        });
+        transactor.start();
+        transactor.join();
+        Assert.assertEquals(0, failCount.get());
+    }
+
+    @Test
+    public void interruptedLogFileSwitch() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
+        int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+
+        // ensure an interrupted transactor will create next log file but will fail to position the log channel
+        final AtomicBoolean interrupted = new AtomicBoolean(false);
+        Thread interruptedTransactor = new Thread(() -> {
+            Thread.currentThread().interrupt();
+            try {
+                prepareNextLogFile(logManager);
+            } catch (Exception e) {
+                Throwable rootCause = ExceptionUtils.getRootCause(e);
+                if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException) {
+                    interrupted.set(true);
+                }
+            }
+        });
+        interruptedTransactor.start();
+        interruptedTransactor.join();
+        // ensure a new log file was created but the thread was interrupt
+        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+        Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
+        Assert.assertTrue(interrupted.get());
+
+        // ensure next transactor will not create another file
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        Thread transactor = new Thread(() -> {
+            try {
+                prepareNextLogFile(logManager);
+            } catch (Exception e) {
+                failed.set(true);
+            }
+        });
+        transactor.start();
+        transactor.join();
+        // make sure no new files were created and the operation was successful
+        int countAfterTransactor = logManager.getLogFileIds().size();
+        Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor);
+        Assert.assertFalse(failed.get());
+
+        // make sure we can still log to the new file
+        interruptedLogPageSwitch();
+    }
+
+    private static String getIndexPath(Dataset dataset, String nodeId) throws Exception {
+        final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset);
+        final Optional<FileSplit> nodeFileSplit =
+                Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst();
+        Assert.assertTrue(nodeFileSplit.isPresent());
+        return nodeFileSplit.get().getPath();
+    }
+
+    private static ITransactionContext beingTransaction(INcApplicationContext ncAppCtx, ILSMIndex index,
+            long resourceId) {
+        final TxnId txnId = new TxnId(1);
+        final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
+        final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager();
+        final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options);
+        txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true);
+        return txnCtx;
+    }
+
+    private static void prepareNextLogFile(LogManager logManager) throws Exception {
+        Method method;
+        try {
+            method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null);
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was it renamed?");
+        }
+        method.setAccessible(true);
+        method.invoke(logManager, null);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
index d17fad7..08eee10 100644
--- a/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
+++ b/asterixdb/asterix-app/src/test/resources/log4j2-test.xml
@@ -27,5 +27,11 @@
     <Logger name="org.apache.asterix.test" level="WARN">
       <AppenderRef ref="Console"/>
     </Logger>
+    <Logger name="org.apache.asterix.transaction.management.service.logging.LogFlusher" level="INFO">
+      <AppenderRef ref="Console"/>
+    </Logger>
+    <Logger name="org.apache.asterix.utils.RebalanceUtil" level="INFO">
+      <AppenderRef ref="Console"/>
+    </Logger>
   </Loggers>
 </Configuration>
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index dcf8250..b67da80 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -125,14 +125,6 @@
         this.fileChannel = fileChannel;
     }
 
-    public void setInitialFlushOffset(long offset) {
-        try {
-            fileChannel.position(offset);
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
     @Override
     public synchronized void setFull() {
         this.full.set(true);
@@ -146,7 +138,7 @@
 
     @Override
     public boolean hasSpace(int logSize) {
-        return appendOffset + logSize <= logPageSize;
+        return appendOffset + logSize <= logPageSize && !full.get();
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 4ce9c71..833f8f6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -24,6 +24,9 @@
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -46,6 +49,7 @@
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogManagerProperties;
+import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.MutableLong;
 import org.apache.asterix.common.transactions.TxnLogFile;
@@ -75,9 +79,9 @@
     private final String nodeId;
     private final FlushLogsLogger flushLogsLogger;
     private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
-    protected final long logFileSize;
-    protected final int logPageSize;
-    protected final AtomicLong appendLSN;
+    private final long logFileSize;
+    private final int logPageSize;
+    private final AtomicLong appendLSN;
     /*
      * Mutables
      */
@@ -85,7 +89,7 @@
     private LinkedBlockingQueue<ILogBuffer> flushQ;
     private LinkedBlockingQueue<ILogBuffer> stashQ;
     private FileChannel appendChannel;
-    protected ILogBuffer appendPage;
+    private ILogBuffer appendPage;
     private LogFlusher logFlusher;
     private Future<? extends Object> futureLogFlusher;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
@@ -112,15 +116,19 @@
         flushQ = new LinkedBlockingQueue<>(numLogPages);
         stashQ = new LinkedBlockingQueue<>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
+            emptyQ.add(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
         }
         appendLSN.set(initializeLogAnchor(nextLogFileId));
         flushLSN.set(appendLSN.get());
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
         }
-        appendChannel = getFileChannel(appendLSN.get(), false);
-        getAndInitNewPage(INITIAL_LOG_SIZE);
+        try {
+            setLogPosition(appendLSN.get());
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
+        initNewPage(INITIAL_LOG_SIZE);
         logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
         if (!flushLogsLogger.isAlive()) {
@@ -129,55 +137,43 @@
     }
 
     @Override
-    public void log(ILogRecord logRecord) throws ACIDException {
+    public void log(ILogRecord logRecord) {
         if (logRecord.getLogType() == LogType.FLUSH) {
-            flushLogsQ.offer(logRecord);
+            flushLogsQ.add(logRecord);
             return;
         }
         appendToLogTail(logRecord);
     }
 
-    protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+    protected void appendToLogTail(ILogRecord logRecord) {
         syncAppendToLogTail(logRecord);
-
-        if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
-                || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
-            synchronized (logRecord) {
-                while (!logRecord.isFlushed()) {
-                    try {
+        if (waitForFlush(logRecord) && !logRecord.isFlushed()) {
+            InvokeUtil.doUninterruptibly(() -> {
+                synchronized (logRecord) {
+                    while (!logRecord.isFlushed()) {
                         logRecord.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
                     }
                 }
-            }
+            });
         }
     }
 
-    protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogType() != LogType.FLUSH) {
+    protected static boolean waitForFlush(ILogRecord logRecord) {
+        final byte logType = logRecord.getLogType();
+        return logType == LogType.JOB_COMMIT || logType == LogType.ABORT || logType == LogType.WAIT;
+    }
+
+    synchronized void syncAppendToLogTail(ILogRecord logRecord) {
+        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
             ITransactionContext txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
                 throw new ACIDException(
                         "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record.");
             }
         }
-
-        /*
-         * To eliminate the case where the modulo of the next appendLSN = 0 (the next
-         * appendLSN = the first LSN of the next log file), we do not allow a log to be
-         * written at the last offset of the current file.
-         */
         final int logSize = logRecord.getLogSize();
-        // Make sure the log will not exceed the log file size
-        if (getLogFileOffset(appendLSN.get()) + logSize >= logFileSize) {
-            prepareNextLogFile();
-            prepareNextPage(logSize);
-        } else if (!appendPage.hasSpace(logSize)) {
-            prepareNextPage(logSize);
-        }
+        ensureSpace(logSize);
         appendPage.append(logRecord, appendLSN.get());
-
         if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
         }
@@ -187,70 +183,98 @@
         appendLSN.addAndGet(logSize);
     }
 
-    protected void prepareNextPage(int logSize) {
-        appendPage.setFull();
-        getAndInitNewPage(logSize);
-    }
-
-    protected void getAndInitNewPage(int logSize) {
-        if (logSize > logPageSize) {
-            // before creating a new page, we need to stash a normal sized page since our queues have fixed capacity
-            appendPage = null;
-            while (appendPage == null) {
-                try {
-                    appendPage = emptyQ.take();
-                    stashQ.add(appendPage);
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-            // for now, alloc a new buffer for each large page
-            // TODO: pool large pages??
-            appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
-            appendPage.setFileChannel(appendChannel);
-            flushQ.offer(appendPage);
-        } else {
-            appendPage = null;
-            while (appendPage == null) {
-                try {
-                    appendPage = emptyQ.take();
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-            appendPage.reset();
-            appendPage.setFileChannel(appendChannel);
-            flushQ.offer(appendPage);
+    private void ensureSpace(int logSize) {
+        if (!fileHasSpace(logSize)) {
+            ensureLastPageFlushed();
+            prepareNextLogFile();
+        }
+        if (!appendPage.hasSpace(logSize)) {
+            prepareNextPage(logSize);
         }
     }
 
-    protected void prepareNextLogFile() {
+    private boolean fileHasSpace(int logSize) {
+        /*
+         * To eliminate the case where the modulo of the next appendLSN = 0 (the next
+         * appendLSN = the first LSN of the next log file), we do not allow a log to be
+         * written at the last offset of the current file.
+         */
+        return getLogFileOffset(appendLSN.get()) + logSize < logFileSize;
+    }
+
+    private void prepareNextPage(int logSize) {
+        appendPage.setFull();
+        initNewPage(logSize);
+    }
+
+    private void initNewPage(int logSize) {
+        boolean largePage = logSize > logPageSize;
+        // if a new large page will be allocated, we need to stash a normal sized page
+        // since our queues have fixed capacity
+        ensureAvailablePage(largePage);
+        if (largePage) {
+            // for now, alloc a new buffer for each large page
+            // TODO: pool large pages??
+            appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
+        } else {
+            appendPage.reset();
+        }
+        appendPage.setFileChannel(appendChannel);
+        flushQ.add(appendPage);
+    }
+
+    private void ensureAvailablePage(boolean stash) {
+        final ILogBuffer currentPage = appendPage;
+        appendPage = null;
+        try {
+            appendPage = emptyQ.take();
+            if (stash) {
+                stashQ.add(appendPage);
+            }
+        } catch (InterruptedException e) {
+            appendPage = currentPage;
+            Thread.currentThread().interrupt();
+            throw new ACIDException(e);
+        }
+    }
+
+    private void prepareNextLogFile() {
+        final long nextFileBeginLsn = getNextFileFirstLsn();
+        try {
+            createNextLogFile();
+            setLogPosition(nextFileBeginLsn);
+        } catch (IOException e) {
+            throw new ACIDException(e);
+        }
+        // move appendLSN and flushLSN to the first LSN of the next log file
+        // only after the file was created and the channel was positioned successfully
+        appendLSN.set(nextFileBeginLsn);
+        flushLSN.set(nextFileBeginLsn);
+        LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn),
+                nextFileBeginLsn);
+    }
+
+    private long getNextFileFirstLsn() {
+        // add the remaining space in the current file
+        return appendLSN.get() + (logFileSize - getLogFileOffset(appendLSN.get()));
+    }
+
+    private void ensureLastPageFlushed() {
         // Mark the page as the last page so that it will close the output file channel.
         appendPage.setLastPage();
         // Make sure to flush whatever left in the log tail.
         appendPage.setFull();
-        //wait until all log records have been flushed in the current file
         synchronized (flushLSN) {
-            try {
-                while (flushLSN.get() != appendLSN.get()) {
-                    //notification will come from LogBuffer.internalFlush(.)
+            while (flushLSN.get() != appendLSN.get()) {
+                // notification will come from LogBuffer.internalFlush(.)
+                try {
                     flushLSN.wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new ACIDException(e);
                 }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
             }
         }
-        //move appendLSN and flushLSN to the first LSN of the next log file
-        appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
-        flushLSN.set(appendLSN.get());
-        appendChannel = getFileChannel(appendLSN.get(), true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get()) + ") starting with LSN = "
-                    + appendLSN.get());
-        }
-        //[Notice]
-        //the current log file channel is closed if
-        //LogBuffer.flush() completely flush the last page of the file.
     }
 
     @Override
@@ -386,8 +410,8 @@
                      * The log file which contains the checkpointLSN has been reached.
                      * The oldest log file being accessed by a LogReader has been reached.
                      */
-                    if (id >= checkpointLSNLogFileID
-                            || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
+                    if (id >= checkpointLSNLogFileID || (txnLogFileId2ReaderCount.containsKey(id)
+                            && txnLogFileId2ReaderCount.get(id) > 0)) {
                         break;
                     }
 
@@ -475,11 +499,11 @@
         return logFileIds;
     }
 
-    public String getLogFilePath(long fileId) {
+    private String getLogFilePath(long fileId) {
         return logDir + File.separator + logFilePrefix + "_" + fileId;
     }
 
-    public long getLogFileOffset(long lsn) {
+    private long getLogFileOffset(long lsn) {
         return lsn % logFileSize;
     }
 
@@ -500,28 +524,24 @@
         return (new File(path)).mkdir();
     }
 
-    private FileChannel getFileChannel(long lsn, boolean create) {
-        FileChannel newFileChannel = null;
-        try {
-            long fileId = getLogFileId(lsn);
-            String logFilePath = getLogFilePath(fileId);
-            File file = new File(logFilePath);
-            if (create) {
-                if (!file.createNewFile()) {
-                    throw new IllegalStateException();
-                }
-            } else {
-                if (!file.exists()) {
-                    throw new IllegalStateException();
-                }
-            }
-            RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "rw");
-            newFileChannel = raf.getChannel();
-            newFileChannel.position(getLogFileOffset(lsn));
-        } catch (IOException e) {
-            throw new IllegalStateException(e);
+    private void createNextLogFile() throws IOException {
+        final long nextFileBeginLsn = getNextFileFirstLsn();
+        final long fileId = getLogFileId(nextFileBeginLsn);
+        final Path nextFilePath = Paths.get(getLogFilePath(fileId));
+        if (nextFilePath.toFile().exists()) {
+            LOGGER.warn("Ignored create log file {} since file already exists", nextFilePath.toString());
+            return;
         }
-        return newFileChannel;
+        Files.createFile(nextFilePath);
+    }
+
+    private void setLogPosition(long lsn) throws IOException {
+        final long fileId = getLogFileId(lsn);
+        final Path targetFilePath = Paths.get(getLogFilePath(fileId));
+        final long targetPosition = getLogFileOffset(lsn);
+        final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed by LogBuffer
+        appendChannel = raf.getChannel();
+        appendChannel.position(targetPosition);
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 99f365a..9b6a4f9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -25,11 +25,10 @@
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.InvokeUtil;
 
 public class LogManagerWithReplication extends LogManager {
 
@@ -42,7 +41,7 @@
     }
 
     @Override
-    public void log(ILogRecord logRecord) throws ACIDException {
+    public void log(ILogRecord logRecord) {
         boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
         if (shouldReplicate) {
             switch (logRecord.getLogType()) {
@@ -66,7 +65,7 @@
 
         //Remote flush logs do not need to be flushed separately since they may not trigger local flush
         if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
-            flushLogsQ.offer(logRecord);
+            flushLogsQ.add(logRecord);
             return;
         }
 
@@ -74,7 +73,7 @@
     }
 
     @Override
-    protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
+    protected void appendToLogTail(ILogRecord logRecord) {
         syncAppendToLogTail(logRecord);
 
         if (logRecord.isReplicated()) {
@@ -82,68 +81,31 @@
                 replicationManager.replicateLog(logRecord);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
+                throw new ACIDException(e);
             }
         }
 
-        if (logRecord.getLogSource() == LogSource.LOCAL) {
-            if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
-                    || logRecord.getLogType() == LogType.WAIT) && !logRecord.isFlushed()) {
+        if (logRecord.getLogSource() == LogSource.LOCAL && waitForFlush(logRecord) && !logRecord.isFlushed()) {
+            InvokeUtil.doUninterruptibly(() -> {
                 synchronized (logRecord) {
                     while (!logRecord.isFlushed()) {
-                        try {
-                            logRecord.wait();
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                        }
+                        logRecord.wait();
                     }
-
                     //wait for job Commit/Abort ACK from replicas
                     if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT)) {
                         while (!replicationManager.hasBeenReplicated(logRecord)) {
-                            try {
-                                logRecord.wait();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            }
+                            logRecord.wait();
                         }
                     }
                 }
-            }
+            });
         }
     }
 
     @Override
-    protected synchronized void syncAppendToLogTail(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
-            ITransactionContext txnCtx = logRecord.getTxnCtx();
-            if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
-                throw new ACIDException(
-                        "Aborted txn(" + txnCtx.getTxnId() + ") tried to write non-abort type log record.");
-            }
-        }
-
-        final int logRecordSize = logRecord.getLogSize();
-        // Make sure the log will not exceed the log file size
-        if (getLogFileOffset(appendLSN.get()) + logRecordSize >= logFileSize) {
-            prepareNextLogFile();
-            prepareNextPage(logRecordSize);
-        } else if (!appendPage.hasSpace(logRecordSize)) {
-            prepareNextPage(logRecordSize);
-        }
-        appendPage.append(logRecord, appendLSN.get());
-
-        if (logRecord.getLogType() == LogType.FLUSH) {
-            logRecord.setLSN(appendLSN.get());
-        }
-
-        appendLSN.addAndGet(logRecordSize);
-    }
-
-    @Override
     public void setReplicationManager(IReplicationManager replicationManager) {
         this.replicationManager = replicationManager;
         this.replicationStrategy = replicationManager.getReplicationStrategy();
     }
-
 }