Merge commit 'e6587f628a72f10594e676ec375c1cf26e73ec53' from release-0.9.4-pre-rc into master
Change-Id: I6f23feac3c249745d561359b117cf1e0d573942a
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4a2cf2d..4b14a9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -59,6 +59,7 @@
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -104,6 +105,7 @@
private SystemState state;
private final INCServiceContext serviceCtx;
private final INcApplicationContext appCtx;
+ private static final TxnId recoveryTxnId = new TxnId(-1);
public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
this.serviceCtx = serviceCtx;
@@ -505,21 +507,24 @@
}
@Override
- public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
- long minLSN = getPartitionsMinLSN(partitions);
- long readableSmallestLSN = logMgr.getReadableSmallestLSN();
- if (minLSN < readableSmallestLSN) {
- minLSN = readableSmallestLSN;
- }
-
+ public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
+ throws HyracksDataException {
//replay logs > minLSN that belong to these partitions
try {
+ checkpointManager.secure(recoveryTxnId);
+ long minLSN = getPartitionsMinLSN(partitions);
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ if (minLSN < readableSmallestLSN) {
+ minLSN = readableSmallestLSN;
+ }
replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
if (flush) {
appCtx.getDatasetLifecycleManager().flushAllDatasets();
}
} catch (IOException | ACIDException e) {
throw HyracksDataException.create(e);
+ } finally {
+ checkpointManager.completed(recoveryTxnId);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 91d98e5..418282e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -28,16 +28,19 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.data.gen.TupleGenerator;
import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.RecoveryManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -50,20 +53,23 @@
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
public class CheckpointingTest {
@@ -88,6 +94,8 @@
private static final String DATASET_NAME = "TestDS";
private static final String DATA_TYPE_NAME = "DUMMY";
private static final String NODE_GROUP_NAME = "DEFAULT";
+ private volatile boolean threadException = false;
+ private Throwable exception = null;
@Before
public void setUp() throws Exception {
@@ -128,7 +136,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+ RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager();
ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
@@ -178,20 +186,70 @@
/*
* At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
- * a checkpoint should delete it.
+ * a checkpoint should delete it. We will also start a second
+ * job to ensure that the checkpointing coexists peacefully
+ * with other concurrent readers of the log that request
+ * deletions to be witheld
*/
- checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
- // Validate initialLowWaterMarkFileId was deleted
- for (Long fileId : logManager.getLogFileIds()) {
- Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
- }
+ JobId jobId2 = nc.newJobId();
+ IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false);
+ nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ // Prepare insert operation
+ LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ insertOp2.open();
+ VSizeFrame frame2 = new VSizeFrame(ctx2);
+ FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
+ for (int i = 0; i < 4; i++) {
+ long lastCkpoint = recoveryManager.getMinFirstLSN();
+ long lastFileId = logManager.getLogFileId(lastCkpoint);
- if (tupleAppender.getTupleCount() > 0) {
- tupleAppender.write(insertOp, true);
+ checkpointManager.tryCheckpoint(lowWaterMarkLSN);
+ // Validate initialLowWaterMarkFileId was deleted
+ for (Long fileId : logManager.getLogFileIds()) {
+ Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+ }
+
+ while (currentLowWaterMarkLogFileId == lastFileId) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2);
+ lowWaterMarkLSN = recoveryManager.getMinFirstLSN();
+ currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+ }
}
- insertOp.close();
- nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread th, Throwable ex) {
+ threadException = true;
+ exception = ex;
+ }
+ };
+
+ Thread t = new Thread(() -> {
+ TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager());
+ doAnswer((Answer) i -> {
+ stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(),
+ (TxnId) i.getArguments()[0]);
+ return null;
+ }).when(spyTxnMgr).abortTransaction(any(TxnId.class));
+
+ spyTxnMgr.abortTransaction(txnCtx.getTxnId());
+ });
+ t.setUncaughtExceptionHandler(h);
+ synchronized (t) {
+ t.start();
+ t.wait();
+ }
+ long lockedLSN = recoveryManager.getMinFirstLSN();
+ checkpointManager.tryCheckpoint(lockedLSN);
+ synchronized (t) {
+ t.notifyAll();
+ }
+ t.join();
+ if (threadException) {
+ throw exception;
+ }
} finally {
nc.deInit();
}
@@ -201,6 +259,32 @@
}
}
+ private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId)
+ throws InterruptedException, HyracksDataException {
+
+ try {
+ if (txnCtx.isWriteTxn()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
+ synchronized (t) {
+ t.notifyAll();
+ t.wait();
+ }
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ txnCtx.setTxnState(ITransactionManager.ABORTED);
+ }
+ } catch (ACIDException | HyracksDataException e) {
+ String msg = "Could not complete rollback! System is in an inconsistent state";
+ throw new ACIDException(msg, e);
+ } finally {
+ txnCtx.complete();
+ txnSubsystem.getLockManager().releaseLocks(txnCtx);
+ txnSubsystem.getCheckpointManager().completed(txnId);
+ }
+ }
+
@Test
public void testCorruptedCheckpointFiles() {
try {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 9e7eb0d..e3cf8b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -46,4 +46,19 @@
* @throws HyracksDataException
*/
long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
-}
\ No newline at end of file
+
+ /**
+ * Secures the current low-water mark until the transaction identified by {@code id} completes.
+ *
+ * @param id
+ * @throws HyracksDataException
+ */
+ void secure(TxnId id) throws HyracksDataException;
+
+ /**
+ * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed.
+ *
+ * @param id
+ */
+ void completed(TxnId id);
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index b7f0985..4bd97c1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -52,7 +52,7 @@
final File indexFile = ioManager.resolve(file).getFile();
if (indexFile.exists()) {
File indexDir = indexFile.getParentFile();
- IoUtil.deleteDirectory(indexDir);
+ IoUtil.delete(indexDir);
LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
} else {
LOGGER.warning(() -> "Requested to delete a non-existing index: " + indexFile.getAbsolutePath());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 3ada608..736de07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -76,7 +76,6 @@
private final String logFilePrefix;
private final MutableLong flushLSN;
private final String nodeId;
- private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
private final long logFileSize;
private final int logPageSize;
private final AtomicLong appendLSN;
@@ -407,24 +406,20 @@
/**
* At this point, any future LogReader should read from LSN >= checkpointLSN
*/
- synchronized (txnLogFileId2ReaderCount) {
- for (Long id : logFileIds) {
- /**
- * Stop deletion if:
- * The log file which contains the checkpointLSN has been reached.
- * The oldest log file being accessed by a LogReader has been reached.
- */
- if (id >= checkpointLSNLogFileID
- || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
- break;
- }
- //delete old log file
- File file = new File(getLogFilePath(id));
- file.delete();
- txnLogFileId2ReaderCount.remove(id);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Deleted log file " + file.getAbsolutePath());
- }
+ for (Long id : logFileIds) {
+ /**
+ * Stop deletion if:
+ * The log file which contains the checkpointLSN has been reached.
+ * The oldest log file being accessed by a LogReader has been reached.
+ */
+ if (id >= checkpointLSNLogFileID) {
+ break;
+ }
+ //delete old log file
+ File file = new File(getLogFilePath(id));
+ file.delete();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Deleted log file " + file.getAbsolutePath());
}
}
}
@@ -450,7 +445,6 @@
}
private long deleteAllLogFiles() {
- txnLogFileId2ReaderCount.clear();
List<Long> logFileIds = getLogFileIds();
if (!logFileIds.isEmpty()) {
for (Long id : logFileIds) {
@@ -607,7 +601,6 @@
RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
FileChannel newFileChannel = raf.getChannel();
TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
- touchLogFile(fileId);
return logFile;
}
@@ -617,32 +610,6 @@
LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
}
fileChannel.close();
- untouchLogFile(logFileRef.getLogFileId());
- }
-
- private void touchLogFile(long fileId) {
- synchronized (txnLogFileId2ReaderCount) {
- if (txnLogFileId2ReaderCount.containsKey(fileId)) {
- txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
- } else {
- txnLogFileId2ReaderCount.put(fileId, 1);
- }
- }
- }
-
- private void untouchLogFile(long fileId) {
- synchronized (txnLogFileId2ReaderCount) {
- if (txnLogFileId2ReaderCount.containsKey(fileId)) {
- int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
- if (newReaderCount < 0) {
- throw new IllegalStateException(
- "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
- }
- txnLogFileId2ReaderCount.put(fileId, newReaderCount);
- } else {
- throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
- }
- }
}
/**
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index a541bd9..6efd0e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -22,10 +22,15 @@
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* An implementation of {@link ICheckpointManager} that defines the logic
* of checkpoints.
@@ -33,9 +38,12 @@
public class CheckpointManager extends AbstractCheckpointManager {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final long NO_SECURED_LSN = -1l;
+ private final Map<TxnId, Long> securedLSNs;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
+ securedLSNs = new HashMap<>();
}
/**
@@ -62,6 +70,10 @@
@Override
public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
LOGGER.info("Attemping soft checkpoint...");
+ final long minSecuredLSN = getMinSecuredLSN();
+ if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) {
+ return minSecuredLSN;
+ }
final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
if (!checkpointSucceeded) {
@@ -77,4 +89,18 @@
}
return minFirstLSN;
}
-}
\ No newline at end of file
+
+ @Override
+ public synchronized void secure(TxnId id) throws HyracksDataException {
+ securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN());
+ }
+
+ @Override
+ public synchronized void completed(TxnId id) {
+ securedLSNs.remove(id);
+ }
+
+ private synchronized long getMinSecuredLSN() {
+ return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 76ecc63..c218dec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.Level;
@@ -103,10 +104,11 @@
LogRecord logRecord = new LogRecord();
TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getCheckpointManager().secure(txnId);
txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
- } catch (ACIDException e) {
+ } catch (HyracksDataException e) {
String msg = "Could not complete rollback! System is in an inconsistent state";
if (LOGGER.isErrorEnabled()) {
LOGGER.log(Level.ERROR, msg, e);
@@ -116,6 +118,7 @@
txnCtx.complete();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
txnCtxRepository.remove(txnCtx.getTxnId());
+ txnSubsystem.getCheckpointManager().completed(txnId);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
/**
* The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -193,15 +195,20 @@
}
private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
- List<Future<Void>> tasks = new ArrayList<>();
+ List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+ Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+ Throwable root = null;
try {
for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
tasks.add(ctx.getExecutorService().submit(() -> {
startSemaphore.release();
try {
action.run(op);
+ } catch (Throwable th) { // NOSONAR: Must catch all causes of failure
+ failures.offer(th);
+ throw th;
} finally {
completeSemaphore.release();
}
@@ -211,13 +218,16 @@
for (Future<Void> task : tasks) {
task.get();
}
- } catch (InterruptedException e) {
- cancelTasks(tasks, startSemaphore, completeSemaphore);
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
} catch (ExecutionException e) {
+ root = e.getCause();
+ } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+ root = e;
+ }
+ if (root != null) {
+ final Throwable failure = root;
cancelTasks(tasks, startSemaphore, completeSemaphore);
- throw HyracksDataException.create(e.getCause());
+ failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+ throw HyracksDataException.create(failure);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 396c026..03227ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -46,30 +46,29 @@
/**
* Delete a file
*
- * @param fileRef
- * the file to be deleted
- * @throws HyracksDataException
- * if the file couldn't be deleted
+ * @param fileRef the file to be deleted
+ * @throws HyracksDataException if the file couldn't be deleted
*/
public static void delete(FileReference fileRef) throws HyracksDataException {
delete(fileRef.getFile());
}
/**
- * Delete a file
+ * Delete a file or directory
*
- * @param file
- * the file to be deleted
- * @throws HyracksDataException
- * if the file couldn't be deleted
+ * @param file the file to be deleted
+ * @throws HyracksDataException if the file (or directory if exists) couldn't be deleted
*/
public static void delete(File file) throws HyracksDataException {
try {
if (file.isDirectory()) {
- deleteDirectory(file);
- } else {
- Files.delete(file.toPath());
+ if (!file.exists()) {
+ return;
+ } else if (!FileUtils.isSymlink(file)) {
+ cleanDirectory(file);
+ }
}
+ Files.delete(file.toPath());
} catch (NoSuchFileException | FileNotFoundException e) {
LOGGER.warn(() -> FILE_NOT_FOUND_MSG + ": " + e.getMessage(), e);
} catch (IOException e) {
@@ -80,10 +79,8 @@
/**
* Create a file on disk
*
- * @param fileRef
- * the file to create
- * @throws HyracksDataException
- * if the file already exists or if it couldn't be created
+ * @param fileRef the file to create
+ * @throws HyracksDataException if the file already exists or if it couldn't be created
*/
public static void create(FileReference fileRef) throws HyracksDataException {
if (fileRef.getFile().exists()) {
@@ -99,17 +96,7 @@
}
}
- public static void deleteDirectory(File directory) throws IOException {
- if (!directory.exists()) {
- return;
- }
- if (!FileUtils.isSymlink(directory)) {
- cleanDirectory(directory);
- }
- Files.delete(directory.toPath());
- }
-
- public static void cleanDirectory(final File directory) throws IOException {
+ private static void cleanDirectory(final File directory) throws IOException {
final File[] files = verifiedListFiles(directory);
for (final File file : files) {
delete(file);
@@ -133,4 +120,5 @@
}
return files;
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 1c6c98e..dce7d35 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -18,10 +18,15 @@
*/
package org.apache.hyracks.control.nc;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -31,6 +36,7 @@
private final IClusterController ccs;
private boolean registrationPending;
+ private boolean registrationCompleted;
private Exception registrationException;
private NodeParameters nodeParameters;
@@ -57,12 +63,14 @@
public synchronized CcId registerNode(NodeRegistration nodeRegistration, int registrationId) throws Exception {
registrationPending = true;
ccs.registerNode(nodeRegistration, registrationId);
- while (registrationPending) {
- wait();
+ try {
+ InvokeUtil.runWithTimeout(this::wait, () -> !registrationPending, 2, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ registrationException = e;
}
if (registrationException != null) {
- LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
- throw registrationException;
+ LOGGER.fatal("Registering with {} failed with exception", this, registrationException);
+ ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
}
return getCcId();
}
@@ -74,4 +82,27 @@
public NodeParameters getNodeParameters() {
return nodeParameters;
}
+
+ public synchronized void notifyConnectionRestored(NodeControllerService ncs, InetSocketAddress ccAddress)
+ throws InterruptedException {
+ if (registrationCompleted) {
+ registrationCompleted = false;
+ ncs.getExecutor().submit(() -> {
+ try {
+ return ncs.registerNode(this, ccAddress);
+ } catch (Exception e) {
+ LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+ throw new IllegalStateException(e);
+ }
+ });
+ }
+ while (!registrationCompleted) {
+ wait();
+ }
+ }
+
+ public synchronized void notifyRegistrationCompleted() {
+ registrationCompleted = true;
+ notifyAll();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0756210..98f5c70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -352,10 +352,11 @@
@Override
public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
// we need to re-register in case of NC -> CC connection reset
+ final CcConnection ccConnection = getCcConnection(ccAddressMap.get(ccAddress));
try {
- registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+ ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IPCException(e);
}
}
@@ -412,7 +413,7 @@
}
}
- private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+ public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
LOGGER.info("Registering with Cluster Controller {}", ccc);
int registrationId = nextRegistrationId.incrementAndGet();
@@ -444,7 +445,7 @@
ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
ccTimers.put(ccId, ccTimer);
}
-
+ ccc.notifyRegistrationCompleted();
LOGGER.info("Registering with Cluster Controller {} complete", ccc);
return ccId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@
private volatile boolean aborted;
- private NodeControllerService ncs;
+ private final NodeControllerService ncs;
private List<List<PartitionChannel>> inputChannelsFromConnectors;
@@ -286,67 +286,62 @@
}
ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
try {
- Exception operatorException = null;
+ Throwable operatorException = null;
try {
operator.initialize();
if (collectors.length > 0) {
final Semaphore sem = new Semaphore(collectors.length - 1);
for (int i = 1; i < collectors.length; ++i) {
+ // Q. Do we ever have a task that has more than one collector?
final IPartitionCollector collector = collectors[i];
final IFrameWriter writer = operator.getInputFrameWriter(i);
- sem.acquire();
+ sem.acquireUninterruptibly();
final int cIdx = i;
executorService.execute(() -> {
- Thread thread = Thread.currentThread();
- // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
- // the thread is not escaped from interruption.
- if (!addPendingThread(thread)) {
- return;
- }
- thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
- thread.setPriority(Thread.MIN_PRIORITY);
try {
- pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
- } catch (HyracksDataException e) {
- synchronized (Task.this) {
- exceptions.add(e);
+ Thread thread = Thread.currentThread();
+ // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+ // the thread is not escaped from interruption.
+ if (!addPendingThread(thread)) {
+ return;
+ }
+ thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ try {
+ pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+ } catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ exceptions.add(e);
+ }
+ } finally {
+ removePendingThread(thread);
}
} finally {
sem.release();
- removePendingThread(thread);
}
});
}
try {
pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
} finally {
- sem.acquire(collectors.length - 1);
+ sem.acquireUninterruptibly(collectors.length - 1);
}
}
- } catch (Exception e) {
- // Store the operator exception
+ } catch (Throwable e) { // NOSONAR: Must catch all failures
operatorException = e;
- throw e;
} finally {
try {
operator.deinitialize();
- } catch (Exception e) {
- if (operatorException != null) {
- // Add deinitialize exception to the operator exception to keep track of both
- operatorException.addSuppressed(e);
- } else {
- operatorException = e;
- }
- throw operatorException;
+ } catch (Throwable e) { // NOSONAR: Must catch all failures
+ operatorException = ExceptionUtils.suppress(operatorException, e);
}
}
- NodeControllerService ncs = joblet.getNodeController();
+ if (operatorException != null) {
+ throw operatorException;
+ }
ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
- } catch (InterruptedException e) {
- exceptions.add(e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- exceptions.add(e);
+ } catch (Throwable e) { // NOSONAR: Catch all failures
+ exceptions.add(HyracksDataException.create(e));
} finally {
close();
removePendingThread(ct);
@@ -360,7 +355,6 @@
exceptions.get(i));
}
}
- NodeControllerService ncs = joblet.getNodeController();
ExceptionUtils.setNodeIds(exceptions, ncs.getId());
ncs.getWorkQueue()
.schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
@@ -457,6 +451,7 @@
return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length);
}
+ @Override
public Set<JobFlag> getJobFlags() {
return jobFlags;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
index a120296..a6c8393 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
@@ -77,15 +78,20 @@
BTree btree = (BTree) component.getIndex();
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
- btreeAccessors[i].search(rangeCursors[i], searchPred);
}
-
- cursorIndexPointable = new IntegerPointable();
- int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
- cursorIndexPointable.set(new byte[length], 0, length);
-
- setPriorityQueueComparator();
- initPriorityQueue();
+ IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+ try {
+ cursorIndexPointable = new IntegerPointable();
+ int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
+ cursorIndexPointable.set(new byte[length], 0, length);
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ } catch (Throwable th) { // NOSONAR: Must call this on
+ for (int i = 0; i < numBTrees; i++) {
+ IndexCursorUtils.close(rangeCursors[i], th);
+ }
+ throw HyracksDataException.create(th);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 0cb7d1c..bfda985 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
private final ArrayTupleReference copyTuple;
@@ -349,7 +350,6 @@
rangeCursors = new IIndexCursor[numBTrees];
btreeAccessors = new BTreeAccessor[numBTrees];
}
-
for (int i = 0; i < numBTrees; i++) {
ILSMComponent component = operationalComponents.get(i);
BTree btree;
@@ -365,12 +365,16 @@
btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursors[i].close();
}
- btreeAccessors[i].search(rangeCursors[i], searchPred);
}
-
- setPriorityQueueComparator();
- initPriorityQueue();
- canCallProceed = true;
+ IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+ try {
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ canCallProceed = true;
+ } catch (Throwable th) { // NOSONAR Must catch all
+ IndexCursorUtils.close(rangeCursors, th);
+ throw HyracksDataException.create(th);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
index 2913be1..b600f12 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBTreeWithBuddySortedCursor extends LSMBTreeWithBuddyAbstractCursor {
// TODO: This class can be removed and instead use a search cursor that uses
@@ -160,12 +161,21 @@
foundNext = false;
for (int i = 0; i < numberOfTrees; i++) {
btreeCursors[i].close();
- btreeAccessors[i].search(btreeCursors[i], btreeRangePredicate);
- if (btreeCursors[i].hasNext()) {
- btreeCursors[i].next();
- } else {
- depletedBtreeCursors[i] = true;
+ }
+ IndexCursorUtils.open(btreeAccessors, btreeCursors, btreeRangePredicate);
+ try {
+ for (int i = 0; i < numberOfTrees; i++) {
+ if (btreeCursors[i].hasNext()) {
+ btreeCursors[i].next();
+ } else {
+ depletedBtreeCursors[i] = true;
+ }
}
+ } catch (Throwable th) { // NOSONAR Must catch all failures to close before throwing
+ for (int i = 0; i < numberOfTrees; i++) {
+ IndexCursorUtils.close(btreeCursors[i], th);
+ }
+ throw HyracksDataException.create(th);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
index fd13e62..b35c5d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
@@ -55,7 +56,6 @@
lsmHarness = null;
int numBTrees = operationalComponents.size();
rangeCursors = new IIndexCursor[numBTrees];
-
RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
for (int i = 0; i < numBTrees; i++) {
@@ -64,9 +64,14 @@
rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyIndex();
btreeAccessors[i] = buddyBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- btreeAccessors[i].search(rangeCursors[i], btreePredicate);
}
- setPriorityQueueComparator();
- initPriorityQueue();
+ IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+ try {
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ } catch (Throwable th) { // NOSONAR: Must catch all failures
+ IndexCursorUtils.close(rangeCursors, th);
+ throw HyracksDataException.create(th);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 62493f4..a8467d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -61,6 +61,15 @@
void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException;
+ /**
+ * If this method returns successfully, then the cursor has been opened, and need to be closed
+ * Otherwise, it has not been opened
+ *
+ * @param ictx
+ * @param cursor
+ * @param pred
+ * @throws HyracksDataException
+ */
void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
index 21ce940..f1f5241 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
@@ -48,7 +49,8 @@
(LSMInvertedIndexRangeSearchCursorInitialState) initialState;
cmp = lsmInitialState.getOriginalKeyComparator();
operationalComponents = lsmInitialState.getOperationalComponents();
- // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+ // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already
+ // do that when we merge the inverted indexes.
lsmHarness = null;
int numBTrees = operationalComponents.size();
rangeCursors = new IIndexCursor[numBTrees];
@@ -60,7 +62,13 @@
rangeCursors[i] = btreeAccessors.get(i).createSearchCursor(false);
btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
}
- setPriorityQueueComparator();
- initPriorityQueue();
+ IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+ try {
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ } catch (Throwable th) { // NOSONAR: Must catch all failures
+ IndexCursorUtils.close(rangeCursors, th);
+ throw HyracksDataException.create(th);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
index 892fe83..b57b517 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
@@ -65,7 +66,13 @@
btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
btreeAccessors[i].search(rangeCursors[i], btreePredicate);
}
- setPriorityQueueComparator();
- initPriorityQueue();
+ IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+ try {
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ } catch (Throwable th) { // NOSONAR: Must catch all failures
+ IndexCursorUtils.close(rangeCursors, th);
+ throw HyracksDataException.create(th);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 9bbc3e1..483e082 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
@@ -163,14 +164,19 @@
super.doOpen(initialState, searchPred);
depletedRtreeCursors = new boolean[numberOfTrees];
foundNext = false;
- for (int i = 0; i < numberOfTrees; i++) {
- rtreeCursors[i].close();
- rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
- if (rtreeCursors[i].hasNext()) {
- rtreeCursors[i].next();
- } else {
- depletedRtreeCursors[i] = true;
+ try {
+ for (int i = 0; i < numberOfTrees; i++) {
+ rtreeCursors[i].close();
+ rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+ if (rtreeCursors[i].hasNext()) {
+ rtreeCursors[i].next();
+ } else {
+ depletedRtreeCursors[i] = true;
+ }
}
+ } catch (Throwable th) { // NOSONAR. Must catch all failures
+ IndexCursorUtils.close(rtreeCursors, th);
+ throw HyracksDataException.create(th);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
index 449c711..427575b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.common.ICursorInitialState;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeWithAntiMatterTuplesFlushCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
private final TreeTupleSorter rTreeTupleSorter;
@@ -49,17 +50,13 @@
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
- boolean rtreeOpen = false;
- boolean btreeOpen = false;
try {
rTreeTupleSorter.open(initialState, searchPred);
- rtreeOpen = true;
bTreeTupleSorter.open(initialState, searchPred);
- btreeOpen = true;
- } finally {
- if (rtreeOpen && !btreeOpen) {
- rTreeTupleSorter.close();
- }
+ } catch (Throwable th) { // NOSONAR: Must catch all failures
+ IndexCursorUtils.close(bTreeTupleSorter, th);
+ IndexCursorUtils.close(rTreeTupleSorter, th);
+ throw HyracksDataException.create(th);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 094acbc..7db65bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
@@ -113,19 +114,25 @@
rangeCursors = new RTreeSearchCursor[numImmutableComponents];
ITreeIndexAccessor[] immutableRTreeAccessors = new ITreeIndexAccessor[numImmutableComponents];
int j = 0;
- for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
- ILSMComponent component = operationalComponents.get(i);
- rangeCursors[j] = new RTreeSearchCursor(
- (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
- (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
- RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
- immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
- j++;
+ try {
+ for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ rangeCursors[j] = new RTreeSearchCursor(
+ (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
+ (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
+ RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
+ immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
+ j++;
+ }
+ searchNextCursor();
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ } catch (Throwable th) { // NOSONAR: Must catch all failures
+ IndexCursorUtils.close(rangeCursors, th);
+ IndexCursorUtils.close(mutableRTreeCursors, th);
+ throw HyracksDataException.create(th);
}
- searchNextCursor();
- setPriorityQueueComparator();
- initPriorityQueue();
open = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
new file mode 100644
index 0000000..7587a2a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common.util;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IndexCursorUtils {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private IndexCursorUtils() {
+ }
+
+ /**
+ * Close the IIndexCursor and suppress any Throwable thrown by the close call.
+ * This method must NEVER throw any Throwable
+ *
+ * @param cursor
+ * the cursor to close
+ * @param root
+ * the first exception encountered during release of resources
+ * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+ */
+ public static Throwable close(IIndexCursor cursor, Throwable root) {
+ if (cursor != null) {
+ try {
+ cursor.close();
+ } catch (Throwable th) { // NOSONAR Will be suppressed
+ try {
+ LOGGER.log(Level.WARN, "Failure closing a cursor", th);
+ } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+ // NOSONAR ignore logging failure
+ }
+ root = ExceptionUtils.suppress(root, th); // NOSONAR: Using the same variable is not bad in this context
+ }
+ }
+ return root;
+ }
+
+ public static void open(List<IIndexAccessor> accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+ throws HyracksDataException {
+ int opened = 0;
+ try {
+ for (int i = 0; i < cursors.length; i++) {
+ if (accessors.get(i) != null) {
+ accessors.get(i).search(cursors[i], pred);
+ }
+ opened++;
+ }
+ } catch (Throwable th) { // NOSONAR: Much catch all failures
+ for (int j = 0; j < opened; j++) {
+ IndexCursorUtils.close(cursors[j], th);
+ }
+ throw HyracksDataException.create(th);
+ }
+ }
+
+ public static void open(IIndexAccessor[] accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+ throws HyracksDataException {
+ int opened = 0;
+ try {
+ for (int i = 0; i < accessors.length; i++) {
+ if (accessors[i] != null) {
+ accessors[i].search(cursors[i], pred);
+ }
+ opened++;
+ }
+ } catch (Throwable th) { // NOSONAR: Much catch all failures
+ for (int j = 0; j < opened; j++) {
+ IndexCursorUtils.close(cursors[j], th);
+ }
+ throw HyracksDataException.create(th);
+ }
+ }
+
+ public static Throwable close(IIndexCursor[] cursors, Throwable th) {
+ for (int j = 0; j < cursors.length; j++) {
+ th = IndexCursorUtils.close(cursors[j], th); // NOSONAR: Using the same variable is cleaner in this context
+ }
+ return th;
+ }
+
+}