Fixing LSMHarness issue.
The following commits from your working branch will be included:
commit 4a2a16f16df99cbf29ac53bf7009e2dc07bdbb26
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date: Fri Apr 24 17:24:02 2015 -0700
Fixing LSMHarness issue
Change-Id: I7c1b3e8283fc3a661c80202cc896a42a1b322416
Reviewed-on: https://asterix-gerrit.ics.uci.edu/251
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index 5765f71..55ba584 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -30,7 +30,9 @@
@Override
public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) {
- assert state != ComponentState.INACTIVE;
+ if (state == ComponentState.INACTIVE) {
+ throw new IllegalStateException("Trying to enter an inactive disk component");
+ }
switch (opType) {
case FORCE_MODIFICATION:
@@ -75,7 +77,10 @@
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
- assert readerCount > -1;
+
+ if (readerCount <= -1) {
+ throw new IllegalStateException("Invalid LSM disk component readerCount: " + readerCount);
+ }
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index ee03570..7717a2b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -248,19 +248,23 @@
public String toString() {
return "LSMIndex [" + fileManager.getBaseDir() + "]";
}
-
+
@Override
public boolean hasMemoryComponents() {
return true;
}
-
+
@Override
public boolean isCurrentMutableComponentEmpty() throws HyracksDataException {
//check if the current memory component has been modified
return !((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).isModified();
}
-
- public void makeCurrentMutableComponentUnWritable() throws HyracksDataException {
- ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setState(ComponentState.READABLE_UNWRITABLE);
+
+ public void setCurrentMutableComponentState(ComponentState componentState) {
+ ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).setState(componentState);
+ }
+
+ public ComponentState getCurrentMutableComponentState() {
+ return ((AbstractMemoryLSMComponent) memoryComponents.get(currentMutableComponentId.get())).getState();
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index b86e98f..76694bb 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -92,7 +92,10 @@
break;
case FLUSH:
if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
- assert writerCount == 0;
+
+ if (writerCount != 0) {
+ throw new IllegalStateException("Trying to flush when writerCount != 0");
+ }
state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
readerCount++;
} else {
@@ -131,7 +134,9 @@
}
break;
case FLUSH:
- assert state == ComponentState.READABLE_UNWRITABLE_FLUSHING;
+ if (state != ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state);
+ }
readerCount--;
if (readerCount == 0) {
state = ComponentState.INACTIVE;
@@ -142,7 +147,10 @@
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
- assert readerCount > -1 && writerCount > -1;
+
+ if (readerCount <= -1 || writerCount <= -1) {
+ throw new IllegalStateException("Invalid reader or writer count " + readerCount + " - " + writerCount);
+ }
}
public boolean isReadable() {
@@ -161,7 +169,7 @@
public ComponentState getState() {
return state;
}
-
+
public void setState(ComponentState state) {
this.state = state;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index d26854f..9f66f63 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -61,7 +62,10 @@
case FLUSH:
ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
- // The mutable component has not been modified by any writer. There is nothing to flush.
+ //The mutable component has not been modified by any writer. There is nothing to flush.
+ //since the component is empty, set its state back to READABLE_WRITABLE
+ ((AbstractLSMIndex) lsmIndex)
+ .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
return false;
}
break;
@@ -104,6 +108,9 @@
numEntered++;
}
entranceSuccessful = numEntered == components.size();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
} finally {
if (!entranceSuccessful) {
int i = 0;
@@ -197,8 +204,13 @@
default:
break;
}
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+
} finally {
- if (failedOperation && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ if (failedOperation
+ && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
//When the operation failed, completeOperation() method must be called
//in order to decrement active operation count which was incremented in beforeOperation() method.
opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
@@ -292,6 +304,10 @@
newComponent = lsmIndex.flush(operation);
operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
lsmIndex.markAsValid(newComponent);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+
} finally {
exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
@@ -337,6 +353,10 @@
newComponent = lsmIndex.merge(operation);
operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
lsmIndex.markAsValid(newComponent);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
@@ -349,8 +369,10 @@
@Override
public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
lsmIndex.markAsValid(c);
- lsmIndex.addComponent(c);
- mergePolicy.diskComponentAdded(lsmIndex, false);
+ synchronized (opTracker) {
+ lsmIndex.addComponent(c);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
}
@Override