[ASTERIXDB-2045][STO] Do Not Wait For Lagging Merge on Failed Flush
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Failed flush operations shouldn't wait for lagging merges
since they won't add any new disk components.
- Use logger to log exceptions.
Change-Id: I915e993a76d5c692a276b1d7f3426a25f910cf46
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1947
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index d0dc4b3..35c93ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -120,6 +120,7 @@
// There is only a single component. There is nothing to merge.
return false;
}
+ break;
default:
break;
}
@@ -161,7 +162,9 @@
}
entranceSuccessful = numEntered == components.size();
} catch (Throwable e) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, opType.name() + " failed to enter components on " + lsmIndex, e);
+ }
throw e;
} finally {
if (!entranceSuccessful) {
@@ -223,12 +226,8 @@
*/
if (opType == LSMOperationType.FLUSH) {
opTracker.notifyAll();
- while (mergePolicy.isMergeLagging(lsmIndex)) {
- try {
- opTracker.wait();
- } catch (InterruptedException e) {
- //ignore
- }
+ if (!failedOperation) {
+ waitForLaggingMerge();
}
} else if (opType == LSMOperationType.MERGE) {
opTracker.notifyAll();
@@ -274,7 +273,7 @@
switch (opType) {
case FLUSH:
// newComponent is null if the flush op. was not performed.
- if (newComponent != null) {
+ if (!failedOperation && newComponent != null) {
lsmIndex.addDiskComponent(newComponent);
if (replicationEnabled) {
componentsToBeReplicated.clear();
@@ -286,7 +285,7 @@
break;
case MERGE:
// newComponent is null if the merge op. was not performed.
- if (newComponent != null) {
+ if (!failedOperation && newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
if (replicationEnabled) {
componentsToBeReplicated.clear();
@@ -300,7 +299,9 @@
break;
}
} catch (Throwable e) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ }
throw e;
} finally {
if (failedOperation && (opType == LSMOperationType.MODIFICATION
@@ -351,7 +352,9 @@
((AbstractLSMDiskComponent) c).destroy();
}
} catch (Throwable e) {
- LOGGER.log(Level.WARNING, "Failure scheduling replication or destroying merged component", e);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failure scheduling replication or destroying merged component", e);
+ }
throw e;
}
}
@@ -456,7 +459,7 @@
try {
exitComponents(ctx, LSMOperationType.SEARCH, null, false);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
@@ -512,7 +515,9 @@
lsmIndex.markAsValid(newComponent);
} catch (Throwable e) {
failedOperation = true;
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+ }
throw e;
} finally {
exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
@@ -561,7 +566,9 @@
lsmIndex.markAsValid(newComponent);
} catch (Throwable e) {
failedOperation = true;
- LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+ }
throw e;
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
@@ -616,12 +623,10 @@
@Override
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents,
boolean bulkload, LSMOperationType opType) throws HyracksDataException {
-
//enter the LSM components to be replicated to prevent them from being deleted until they are replicated
if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
return;
}
-
lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType);
}
@@ -717,6 +722,30 @@
throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
}
+ /**
+ * Waits for any lagging merge operations to finish to avoid breaking
+ * the merge policy (i.e. adding a new disk component can make the
+ * number of mergable immutable components > maxToleranceComponentCount
+ * by the merge policy)
+ *
+ * @throws HyracksDataException
+ */
+ private void waitForLaggingMerge() throws HyracksDataException {
+ synchronized (opTracker) {
+ while (mergePolicy.isMergeLagging(lsmIndex)) {
+ try {
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Ignoring interrupt while waiting for lagging merge on " + lsmIndex,
+ e);
+ }
+ }
+ }
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + ":" + lsmIndex;