Merge commit 'e6587f628a72f10594e676ec375c1cf26e73ec53' from release-0.9.4-pre-rc into master

Change-Id: I6f23feac3c249745d561359b117cf1e0d573942a
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4a2cf2d..4b14a9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -59,6 +59,7 @@
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -104,6 +105,7 @@
     private SystemState state;
     private final INCServiceContext serviceCtx;
     private final INcApplicationContext appCtx;
+    private static final TxnId recoveryTxnId = new TxnId(-1);
 
     public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
@@ -505,21 +507,24 @@
     }
 
     @Override
-    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
-        long minLSN = getPartitionsMinLSN(partitions);
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        if (minLSN < readableSmallestLSN) {
-            minLSN = readableSmallestLSN;
-        }
-
+    public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
+            throws HyracksDataException {
         //replay logs > minLSN that belong to these partitions
         try {
+            checkpointManager.secure(recoveryTxnId);
+            long minLSN = getPartitionsMinLSN(partitions);
+            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+            if (minLSN < readableSmallestLSN) {
+                minLSN = readableSmallestLSN;
+            }
             replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
             if (flush) {
                 appCtx.getDatasetLifecycleManager().flushAllDatasets();
             }
         } catch (IOException | ACIDException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            checkpointManager.completed(recoveryTxnId);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 91d98e5..418282e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -28,16 +28,19 @@
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.RecoveryManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -50,20 +53,23 @@
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class CheckpointingTest {
 
@@ -88,6 +94,8 @@
     private static final String DATASET_NAME = "TestDS";
     private static final String DATA_TYPE_NAME = "DUMMY";
     private static final String NODE_GROUP_NAME = "DEFAULT";
+    private volatile boolean threadException = false;
+    private Throwable exception = null;
 
     @Before
     public void setUp() throws Exception {
@@ -128,7 +136,7 @@
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
 
-                IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+                RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager();
                 ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
@@ -178,20 +186,70 @@
 
                 /*
                  * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
-                 * a checkpoint should delete it.
+                 * a checkpoint should delete it. We will also start a second
+                  * job to ensure that the checkpointing coexists peacefully
+                  * with other concurrent readers of the log that request
+                  * deletions to be witheld
                  */
-                checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
 
-                // Validate initialLowWaterMarkFileId was deleted
-                for (Long fileId : logManager.getLogFileIds()) {
-                    Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
-                }
+                JobId jobId2 = nc.newJobId();
+                IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false);
+                nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+                        new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+                // Prepare insert operation
+                LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                insertOp2.open();
+                VSizeFrame frame2 = new VSizeFrame(ctx2);
+                FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
+                for (int i = 0; i < 4; i++) {
+                    long lastCkpoint = recoveryManager.getMinFirstLSN();
+                    long lastFileId = logManager.getLogFileId(lastCkpoint);
 
-                if (tupleAppender.getTupleCount() > 0) {
-                    tupleAppender.write(insertOp, true);
+                    checkpointManager.tryCheckpoint(lowWaterMarkLSN);
+                    // Validate initialLowWaterMarkFileId was deleted
+                    for (Long fileId : logManager.getLogFileIds()) {
+                        Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                    }
+
+                    while (currentLowWaterMarkLogFileId == lastFileId) {
+                        ITupleReference tuple = tupleGenerator.next();
+                        DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2);
+                        lowWaterMarkLSN = recoveryManager.getMinFirstLSN();
+                        currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+                    }
                 }
-                insertOp.close();
-                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+                Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+                    public void uncaughtException(Thread th, Throwable ex) {
+                        threadException = true;
+                        exception = ex;
+                    }
+                };
+
+                Thread t = new Thread(() -> {
+                    TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager());
+                    doAnswer((Answer) i -> {
+                        stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(),
+                                (TxnId) i.getArguments()[0]);
+                        return null;
+                    }).when(spyTxnMgr).abortTransaction(any(TxnId.class));
+
+                    spyTxnMgr.abortTransaction(txnCtx.getTxnId());
+                });
+                t.setUncaughtExceptionHandler(h);
+                synchronized (t) {
+                    t.start();
+                    t.wait();
+                }
+                long lockedLSN = recoveryManager.getMinFirstLSN();
+                checkpointManager.tryCheckpoint(lockedLSN);
+                synchronized (t) {
+                    t.notifyAll();
+                }
+                t.join();
+                if (threadException) {
+                    throw exception;
+                }
             } finally {
                 nc.deInit();
             }
@@ -201,6 +259,32 @@
         }
     }
 
+    private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId)
+            throws InterruptedException, HyracksDataException {
+
+        try {
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
+                synchronized (t) {
+                    t.notifyAll();
+                    t.wait();
+                }
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+                txnCtx.setTxnState(ITransactionManager.ABORTED);
+            }
+        } catch (ACIDException | HyracksDataException e) {
+            String msg = "Could not complete rollback! System is in an inconsistent state";
+            throw new ACIDException(msg, e);
+        } finally {
+            txnCtx.complete();
+            txnSubsystem.getLockManager().releaseLocks(txnCtx);
+            txnSubsystem.getCheckpointManager().completed(txnId);
+        }
+    }
+
     @Test
     public void testCorruptedCheckpointFiles() {
         try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 9e7eb0d..e3cf8b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -46,4 +46,19 @@
      * @throws HyracksDataException
      */
     long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
-}
\ No newline at end of file
+
+    /**
+     * Secures the current low-water mark until the transaction identified by {@code id} completes.
+     *
+     * @param id
+     * @throws HyracksDataException
+     */
+    void secure(TxnId id) throws HyracksDataException;
+
+    /**
+     * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed.
+     *
+     * @param id
+     */
+    void completed(TxnId id);
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index b7f0985..4bd97c1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -52,7 +52,7 @@
             final File indexFile = ioManager.resolve(file).getFile();
             if (indexFile.exists()) {
                 File indexDir = indexFile.getParentFile();
-                IoUtil.deleteDirectory(indexDir);
+                IoUtil.delete(indexDir);
                 LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
             } else {
                 LOGGER.warning(() -> "Requested to delete a non-existing index: " + indexFile.getAbsolutePath());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 3ada608..736de07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -76,7 +76,6 @@
     private final String logFilePrefix;
     private final MutableLong flushLSN;
     private final String nodeId;
-    private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
     private final long logFileSize;
     private final int logPageSize;
     private final AtomicLong appendLSN;
@@ -407,24 +406,20 @@
             /**
              * At this point, any future LogReader should read from LSN >= checkpointLSN
              */
-            synchronized (txnLogFileId2ReaderCount) {
-                for (Long id : logFileIds) {
-                    /**
-                     * Stop deletion if:
-                     * The log file which contains the checkpointLSN has been reached.
-                     * The oldest log file being accessed by a LogReader has been reached.
-                     */
-                    if (id >= checkpointLSNLogFileID
-                            || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
-                        break;
-                    }
-                    //delete old log file
-                    File file = new File(getLogFilePath(id));
-                    file.delete();
-                    txnLogFileId2ReaderCount.remove(id);
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("Deleted log file " + file.getAbsolutePath());
-                    }
+            for (Long id : logFileIds) {
+                /**
+                 * Stop deletion if:
+                 * The log file which contains the checkpointLSN has been reached.
+                 * The oldest log file being accessed by a LogReader has been reached.
+                 */
+                if (id >= checkpointLSNLogFileID) {
+                    break;
+                }
+                //delete old log file
+                File file = new File(getLogFilePath(id));
+                file.delete();
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Deleted log file " + file.getAbsolutePath());
                 }
             }
         }
@@ -450,7 +445,6 @@
     }
 
     private long deleteAllLogFiles() {
-        txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (!logFileIds.isEmpty()) {
             for (Long id : logFileIds) {
@@ -607,7 +601,6 @@
         RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
         FileChannel newFileChannel = raf.getChannel();
         TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
-        touchLogFile(fileId);
         return logFile;
     }
 
@@ -617,32 +610,6 @@
             LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
         }
         fileChannel.close();
-        untouchLogFile(logFileRef.getLogFileId());
-    }
-
-    private void touchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
-            } else {
-                txnLogFileId2ReaderCount.put(fileId, 1);
-            }
-        }
-    }
-
-    private void untouchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
-                if (newReaderCount < 0) {
-                    throw new IllegalStateException(
-                            "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
-                }
-                txnLogFileId2ReaderCount.put(fileId, newReaderCount);
-            } else {
-                throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
-            }
-        }
     }
 
     /**
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index a541bd9..6efd0e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -22,10 +22,15 @@
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An implementation of {@link ICheckpointManager} that defines the logic
  * of checkpoints.
@@ -33,9 +38,12 @@
 public class CheckpointManager extends AbstractCheckpointManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long NO_SECURED_LSN = -1l;
+    private final Map<TxnId, Long> securedLSNs;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
+        securedLSNs = new HashMap<>();
     }
 
     /**
@@ -62,6 +70,10 @@
     @Override
     public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
         LOGGER.info("Attemping soft checkpoint...");
+        final long minSecuredLSN = getMinSecuredLSN();
+        if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) {
+            return minSecuredLSN;
+        }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
         if (!checkpointSucceeded) {
@@ -77,4 +89,18 @@
         }
         return minFirstLSN;
     }
-}
\ No newline at end of file
+
+    @Override
+    public synchronized void secure(TxnId id) throws HyracksDataException {
+        securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN());
+    }
+
+    @Override
+    public synchronized void completed(TxnId id) {
+        securedLSNs.remove(id);
+    }
+
+    private synchronized long getMinSecuredLSN() {
+        return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 76ecc63..c218dec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
@@ -103,10 +104,11 @@
                 LogRecord logRecord = new LogRecord();
                 TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
                 txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
                 txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 txnCtx.setTxnState(ITransactionManager.ABORTED);
             }
-        } catch (ACIDException e) {
+        } catch (HyracksDataException e) {
             String msg = "Could not complete rollback! System is in an inconsistent state";
             if (LOGGER.isErrorEnabled()) {
                 LOGGER.log(Level.ERROR, msg, e);
@@ -116,6 +118,7 @@
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             txnCtxRepository.remove(txnCtx.getTxnId());
+            txnSubsystem.getCheckpointManager().completed(txnId);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -193,15 +195,20 @@
     }
 
     private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
-        List<Future<Void>> tasks = new ArrayList<>();
+        List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+        Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
         final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+        Throwable root = null;
         try {
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 tasks.add(ctx.getExecutorService().submit(() -> {
                     startSemaphore.release();
                     try {
                         action.run(op);
+                    } catch (Throwable th) { // NOSONAR: Must catch all causes of failure
+                        failures.offer(th);
+                        throw th;
                     } finally {
                         completeSemaphore.release();
                     }
@@ -211,13 +218,16 @@
             for (Future<Void> task : tasks) {
                 task.get();
             }
-        } catch (InterruptedException e) {
-            cancelTasks(tasks, startSemaphore, completeSemaphore);
-            Thread.currentThread().interrupt();
-            throw HyracksDataException.create(e);
         } catch (ExecutionException e) {
+            root = e.getCause();
+        } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+            root = e;
+        }
+        if (root != null) {
+            final Throwable failure = root;
             cancelTasks(tasks, startSemaphore, completeSemaphore);
-            throw HyracksDataException.create(e.getCause());
+            failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+            throw HyracksDataException.create(failure);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 396c026..03227ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -46,30 +46,29 @@
     /**
      * Delete a file
      *
-     * @param fileRef
-     *            the file to be deleted
-     * @throws HyracksDataException
-     *             if the file couldn't be deleted
+     * @param fileRef the file to be deleted
+     * @throws HyracksDataException if the file couldn't be deleted
      */
     public static void delete(FileReference fileRef) throws HyracksDataException {
         delete(fileRef.getFile());
     }
 
     /**
-     * Delete a file
+     * Delete a file or directory
      *
-     * @param file
-     *            the file to be deleted
-     * @throws HyracksDataException
-     *             if the file couldn't be deleted
+     * @param file the file to be deleted
+     * @throws HyracksDataException if the file (or directory if exists) couldn't be deleted
      */
     public static void delete(File file) throws HyracksDataException {
         try {
             if (file.isDirectory()) {
-                deleteDirectory(file);
-            } else {
-                Files.delete(file.toPath());
+                if (!file.exists()) {
+                    return;
+                } else if (!FileUtils.isSymlink(file)) {
+                    cleanDirectory(file);
+                }
             }
+            Files.delete(file.toPath());
         } catch (NoSuchFileException | FileNotFoundException e) {
             LOGGER.warn(() -> FILE_NOT_FOUND_MSG + ": " + e.getMessage(), e);
         } catch (IOException e) {
@@ -80,10 +79,8 @@
     /**
      * Create a file on disk
      *
-     * @param fileRef
-     *            the file to create
-     * @throws HyracksDataException
-     *             if the file already exists or if it couldn't be created
+     * @param fileRef the file to create
+     * @throws HyracksDataException if the file already exists or if it couldn't be created
      */
     public static void create(FileReference fileRef) throws HyracksDataException {
         if (fileRef.getFile().exists()) {
@@ -99,17 +96,7 @@
         }
     }
 
-    public static void deleteDirectory(File directory) throws IOException {
-        if (!directory.exists()) {
-            return;
-        }
-        if (!FileUtils.isSymlink(directory)) {
-            cleanDirectory(directory);
-        }
-        Files.delete(directory.toPath());
-    }
-
-    public static void cleanDirectory(final File directory) throws IOException {
+    private static void cleanDirectory(final File directory) throws IOException {
         final File[] files = verifiedListFiles(directory);
         for (final File file : files) {
             delete(file);
@@ -133,4 +120,5 @@
         }
         return files;
     }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 1c6c98e..dce7d35 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -18,10 +18,15 @@
  */
 package org.apache.hyracks.control.nc;
 
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -31,6 +36,7 @@
 
     private final IClusterController ccs;
     private boolean registrationPending;
+    private boolean registrationCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
@@ -57,12 +63,14 @@
     public synchronized CcId registerNode(NodeRegistration nodeRegistration, int registrationId) throws Exception {
         registrationPending = true;
         ccs.registerNode(nodeRegistration, registrationId);
-        while (registrationPending) {
-            wait();
+        try {
+            InvokeUtil.runWithTimeout(this::wait, () -> !registrationPending, 2, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            registrationException = e;
         }
         if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
-            throw registrationException;
+            LOGGER.fatal("Registering with {} failed with exception", this, registrationException);
+            ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
         }
         return getCcId();
     }
@@ -74,4 +82,27 @@
     public NodeParameters getNodeParameters() {
         return nodeParameters;
     }
+
+    public synchronized void notifyConnectionRestored(NodeControllerService ncs, InetSocketAddress ccAddress)
+            throws InterruptedException {
+        if (registrationCompleted) {
+            registrationCompleted = false;
+            ncs.getExecutor().submit(() -> {
+                try {
+                    return ncs.registerNode(this, ccAddress);
+                } catch (Exception e) {
+                    LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+                    throw new IllegalStateException(e);
+                }
+            });
+        }
+        while (!registrationCompleted) {
+            wait();
+        }
+    }
+
+    public synchronized void notifyRegistrationCompleted() {
+        registrationCompleted = true;
+        notifyAll();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0756210..98f5c70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -352,10 +352,11 @@
                 @Override
                 public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
                     // we need to re-register in case of NC -> CC connection reset
+                    final CcConnection ccConnection = getCcConnection(ccAddressMap.get(ccAddress));
                     try {
-                        registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+                        ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         throw new IPCException(e);
                     }
                 }
@@ -412,7 +413,7 @@
         }
     }
 
-    private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+    public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
 
         int registrationId = nextRegistrationId.incrementAndGet();
@@ -444,7 +445,7 @@
             ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
             ccTimers.put(ccId, ccTimer);
         }
-
+        ccc.notifyRegistrationCompleted();
         LOGGER.info("Registering with Cluster Controller {} complete", ccc);
         return ccId;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@
 
     private volatile boolean aborted;
 
-    private NodeControllerService ncs;
+    private final NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
@@ -286,67 +286,62 @@
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
-            Exception operatorException = null;
+            Throwable operatorException = null;
             try {
                 operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {
+                        // Q. Do we ever have a task that has more than one collector?
                         final IPartitionCollector collector = collectors[i];
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
-                        sem.acquire();
+                        sem.acquireUninterruptibly();
                         final int cIdx = i;
                         executorService.execute(() -> {
-                            Thread thread = Thread.currentThread();
-                            // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
-                            // the thread is not escaped from interruption.
-                            if (!addPendingThread(thread)) {
-                                return;
-                            }
-                            thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
-                            thread.setPriority(Thread.MIN_PRIORITY);
                             try {
-                                pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
-                            } catch (HyracksDataException e) {
-                                synchronized (Task.this) {
-                                    exceptions.add(e);
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
+                                    return;
+                                }
+                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
+                                try {
+                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+                                } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        exceptions.add(e);
+                                    }
+                                } finally {
+                                    removePendingThread(thread);
                                 }
                             } finally {
                                 sem.release();
-                                removePendingThread(thread);
                             }
                         });
                     }
                     try {
                         pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
                     } finally {
-                        sem.acquire(collectors.length - 1);
+                        sem.acquireUninterruptibly(collectors.length - 1);
                     }
                 }
-            } catch (Exception e) {
-                // Store the operator exception
+            } catch (Throwable e) { // NOSONAR: Must catch all failures
                 operatorException = e;
-                throw e;
             } finally {
                 try {
                     operator.deinitialize();
-                } catch (Exception e) {
-                    if (operatorException != null) {
-                        // Add deinitialize exception to the operator exception to keep track of both
-                        operatorException.addSuppressed(e);
-                    } else {
-                        operatorException = e;
-                    }
-                    throw operatorException;
+                } catch (Throwable e) { // NOSONAR: Must catch all failures
+                    operatorException = ExceptionUtils.suppress(operatorException, e);
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
+            if (operatorException != null) {
+                throw operatorException;
+            }
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
-        } catch (InterruptedException e) {
-            exceptions.add(e);
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            exceptions.add(e);
+        } catch (Throwable e) { // NOSONAR: Catch all failures
+            exceptions.add(HyracksDataException.create(e));
         } finally {
             close();
             removePendingThread(ct);
@@ -360,7 +355,6 @@
                             exceptions.get(i));
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue()
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
@@ -457,6 +451,7 @@
         return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length);
     }
 
+    @Override
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
index a120296..a6c8393 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
 
@@ -77,15 +78,20 @@
             BTree btree = (BTree) component.getIndex();
             btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
-            btreeAccessors[i].search(rangeCursors[i], searchPred);
         }
-
-        cursorIndexPointable = new IntegerPointable();
-        int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
-        cursorIndexPointable.set(new byte[length], 0, length);
-
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+        try {
+            cursorIndexPointable = new IntegerPointable();
+            int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
+            cursorIndexPointable.set(new byte[length], 0, length);
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must call this on
+            for (int i = 0; i < numBTrees; i++) {
+                IndexCursorUtils.close(rangeCursors[i], th);
+            }
+            throw HyracksDataException.create(th);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 0cb7d1c..bfda985 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
     private final ArrayTupleReference copyTuple;
@@ -349,7 +350,6 @@
             rangeCursors = new IIndexCursor[numBTrees];
             btreeAccessors = new BTreeAccessor[numBTrees];
         }
-
         for (int i = 0; i < numBTrees; i++) {
             ILSMComponent component = operationalComponents.get(i);
             BTree btree;
@@ -365,12 +365,16 @@
                 btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 rangeCursors[i].close();
             }
-            btreeAccessors[i].search(rangeCursors[i], searchPred);
         }
-
-        setPriorityQueueComparator();
-        initPriorityQueue();
-        canCallProceed = true;
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+            canCallProceed = true;
+        } catch (Throwable th) { // NOSONAR Must catch all
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
index 2913be1..b600f12 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeWithBuddySortedCursor extends LSMBTreeWithBuddyAbstractCursor {
     // TODO: This class can be removed and instead use a search cursor that uses
@@ -160,12 +161,21 @@
         foundNext = false;
         for (int i = 0; i < numberOfTrees; i++) {
             btreeCursors[i].close();
-            btreeAccessors[i].search(btreeCursors[i], btreeRangePredicate);
-            if (btreeCursors[i].hasNext()) {
-                btreeCursors[i].next();
-            } else {
-                depletedBtreeCursors[i] = true;
+        }
+        IndexCursorUtils.open(btreeAccessors, btreeCursors, btreeRangePredicate);
+        try {
+            for (int i = 0; i < numberOfTrees; i++) {
+                if (btreeCursors[i].hasNext()) {
+                    btreeCursors[i].next();
+                } else {
+                    depletedBtreeCursors[i] = true;
+                }
             }
+        } catch (Throwable th) { // NOSONAR Must catch all failures to close before throwing
+            for (int i = 0; i < numberOfTrees; i++) {
+                IndexCursorUtils.close(btreeCursors[i], th);
+            }
+            throw HyracksDataException.create(th);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
index fd13e62..b35c5d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -55,7 +56,6 @@
         lsmHarness = null;
         int numBTrees = operationalComponents.size();
         rangeCursors = new IIndexCursor[numBTrees];
-
         RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
         IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
         for (int i = 0; i < numBTrees; i++) {
@@ -64,9 +64,14 @@
             rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
             BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyIndex();
             btreeAccessors[i] = buddyBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            btreeAccessors[i].search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 62493f4..a8467d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -61,6 +61,15 @@
 
     void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException;
 
+    /**
+     * If this method returns successfully, then the cursor has been opened, and need to be closed
+     * Otherwise, it has not been opened
+     *
+     * @param ictx
+     * @param cursor
+     * @param pred
+     * @throws HyracksDataException
+     */
     void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
 
     public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
index 21ce940..f1f5241 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -48,7 +49,8 @@
                 (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
         cmp = lsmInitialState.getOriginalKeyComparator();
         operationalComponents = lsmInitialState.getOperationalComponents();
-        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already
+        // do that when we merge the inverted indexes.
         lsmHarness = null;
         int numBTrees = operationalComponents.size();
         rangeCursors = new IIndexCursor[numBTrees];
@@ -60,7 +62,13 @@
             rangeCursors[i] = btreeAccessors.get(i).createSearchCursor(false);
             btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
index 892fe83..b57b517 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -65,7 +66,13 @@
             btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             btreeAccessors[i].search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 9bbc3e1..483e082 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
 
@@ -163,14 +164,19 @@
         super.doOpen(initialState, searchPred);
         depletedRtreeCursors = new boolean[numberOfTrees];
         foundNext = false;
-        for (int i = 0; i < numberOfTrees; i++) {
-            rtreeCursors[i].close();
-            rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
-            if (rtreeCursors[i].hasNext()) {
-                rtreeCursors[i].next();
-            } else {
-                depletedRtreeCursors[i] = true;
+        try {
+            for (int i = 0; i < numberOfTrees; i++) {
+                rtreeCursors[i].close();
+                rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+                if (rtreeCursors[i].hasNext()) {
+                    rtreeCursors[i].next();
+                } else {
+                    depletedRtreeCursors[i] = true;
+                }
             }
+        } catch (Throwable th) { // NOSONAR. Must catch all failures
+            IndexCursorUtils.close(rtreeCursors, th);
+            throw HyracksDataException.create(th);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
index 449c711..427575b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeWithAntiMatterTuplesFlushCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
     private final TreeTupleSorter rTreeTupleSorter;
@@ -49,17 +50,13 @@
 
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
-        boolean rtreeOpen = false;
-        boolean btreeOpen = false;
         try {
             rTreeTupleSorter.open(initialState, searchPred);
-            rtreeOpen = true;
             bTreeTupleSorter.open(initialState, searchPred);
-            btreeOpen = true;
-        } finally {
-            if (rtreeOpen && !btreeOpen) {
-                rTreeTupleSorter.close();
-            }
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(bTreeTupleSorter, th);
+            IndexCursorUtils.close(rTreeTupleSorter, th);
+            throw HyracksDataException.create(th);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 094acbc..7db65bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
 
@@ -113,19 +114,25 @@
         rangeCursors = new RTreeSearchCursor[numImmutableComponents];
         ITreeIndexAccessor[] immutableRTreeAccessors = new ITreeIndexAccessor[numImmutableComponents];
         int j = 0;
-        for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
-            ILSMComponent component = operationalComponents.get(i);
-            rangeCursors[j] = new RTreeSearchCursor(
-                    (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
-                    (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
-            RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
-            immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
-            j++;
+        try {
+            for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
+                ILSMComponent component = operationalComponents.get(i);
+                rangeCursors[j] = new RTreeSearchCursor(
+                        (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
+                        (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
+                RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
+                immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
+                j++;
+            }
+            searchNextCursor();
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            IndexCursorUtils.close(mutableRTreeCursors, th);
+            throw HyracksDataException.create(th);
         }
-        searchNextCursor();
-        setPriorityQueueComparator();
-        initPriorityQueue();
         open = true;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
new file mode 100644
index 0000000..7587a2a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common.util;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IndexCursorUtils {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private IndexCursorUtils() {
+    }
+
+    /**
+     * Close the IIndexCursor and suppress any Throwable thrown by the close call.
+     * This method must NEVER throw any Throwable
+     *
+     * @param cursor
+     *            the cursor to close
+     * @param root
+     *            the first exception encountered during release of resources
+     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+     */
+    public static Throwable close(IIndexCursor cursor, Throwable root) {
+        if (cursor != null) {
+            try {
+                cursor.close();
+            } catch (Throwable th) { // NOSONAR Will be suppressed
+                try {
+                    LOGGER.log(Level.WARN, "Failure closing a cursor", th);
+                } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+                    // NOSONAR ignore logging failure
+                }
+                root = ExceptionUtils.suppress(root, th); // NOSONAR: Using the same variable is not bad in this context
+            }
+        }
+        return root;
+    }
+
+    public static void open(List<IIndexAccessor> accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+            throws HyracksDataException {
+        int opened = 0;
+        try {
+            for (int i = 0; i < cursors.length; i++) {
+                if (accessors.get(i) != null) {
+                    accessors.get(i).search(cursors[i], pred);
+                }
+                opened++;
+            }
+        } catch (Throwable th) { // NOSONAR: Much catch all failures
+            for (int j = 0; j < opened; j++) {
+                IndexCursorUtils.close(cursors[j], th);
+            }
+            throw HyracksDataException.create(th);
+        }
+    }
+
+    public static void open(IIndexAccessor[] accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+            throws HyracksDataException {
+        int opened = 0;
+        try {
+            for (int i = 0; i < accessors.length; i++) {
+                if (accessors[i] != null) {
+                    accessors[i].search(cursors[i], pred);
+                }
+                opened++;
+            }
+        } catch (Throwable th) { // NOSONAR: Much catch all failures
+            for (int j = 0; j < opened; j++) {
+                IndexCursorUtils.close(cursors[j], th);
+            }
+            throw HyracksDataException.create(th);
+        }
+    }
+
+    public static Throwable close(IIndexCursor[] cursors, Throwable th) {
+        for (int j = 0; j < cursors.length; j++) {
+            th = IndexCursorUtils.close(cursors[j], th); // NOSONAR: Using the same variable is cleaner in this context
+        }
+        return th;
+    }
+
+}