[ASTERIXDB-1764][STO] Ensure LOAD follow same lifecycle with merge/flush
- user model changes: no
- storage format changes: no
- interface change: no
Details:
- Ensure ioOperationCallbacks are properly called for bulk loaded
component
- Add Load type to LSMIOOperationType to distinguish bulk loaded
component from flush component
- Change ILSMIOOperationCallback to use LSMIOOperationType instead of
LSMOperationType, because this callback only targets at LSM IO
operaitons
Change-Id: Ib9ecf7292c5dbaf8638d159decc6e6faf79de58b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2131
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 327c66e..142bcc5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -20,10 +20,10 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
public class TestLsmBtreeIoOpCallbackFactory implements ILSMIOOperationCallbackFactory {
@@ -95,18 +95,18 @@
}
@Override
- public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
super.afterFinalize(opType, newComponent);
synchronized (INSTANCE) {
if (newComponent != null) {
if (newComponent == EmptyComponent.INSTANCE) {
- if (opType == LSMOperationType.FLUSH) {
+ if (opType == LSMIOOperationType.FLUSH) {
rollbackFlushes++;
} else {
rollbackMerges++;
}
} else {
- if (opType == LSMOperationType.FLUSH) {
+ if (opType == LSMIOOperationType.FLUSH) {
completedFlushes++;
} else {
completedMerges++;
@@ -119,8 +119,8 @@
}
}
- private void recordFailure(LSMOperationType opType) {
- if (opType == LSMOperationType.FLUSH) {
+ private void recordFailure(LSMIOOperationType opType) {
+ if (opType == LSMIOOperationType.FLUSH) {
failedFlushes++;
} else {
failedMerges++;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 5fcbac9..68f42e7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -30,10 +30,10 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMDiskComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
@@ -67,8 +67,8 @@
}
@Override
- public void beforeOperation(LSMOperationType opType) throws HyracksDataException {
- if (opType == LSMOperationType.FLUSH) {
+ public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
+ if (opType == LSMIOOperationType.FLUSH) {
/*
* This method was called on the scheduleFlush operation.
* We set the lastLSN to the last LSN for the index (the LSN for the flush log)
@@ -87,9 +87,9 @@
}
@Override
- public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
// The operation was complete and the next I/O operation for the LSM index didn't start yet
- if (opType == LSMOperationType.FLUSH && newComponent != null) {
+ if (opType == LSMIOOperationType.FLUSH && newComponent != null) {
synchronized (this) {
flushRequested[readIndex] = false;
// if the component which just finished flushing is the component that will be modified next,
@@ -183,13 +183,13 @@
}
@Override
- public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+ public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
//TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
if (newComponent != null) {
putLSNIntoMetadata(newComponent, oldComponents);
putComponentIdIntoMetadata(newComponent, oldComponents);
- if (opType == LSMOperationType.MERGE) {
+ if (opType == LSMIOOperationType.MERGE) {
// In case of merge, oldComponents are never null
LongPointable markerLsn =
LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index 8d4cd51..d48227f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -21,8 +21,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -38,17 +38,17 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
@@ -62,11 +62,11 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush first component again
//this call should fail
@@ -75,10 +75,10 @@
//the scheduleFlush request would fail this time
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index ec1aba2..94ef0a3 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -21,8 +21,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -38,17 +38,17 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
@@ -62,11 +62,11 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush first component again
//this call should fail
@@ -75,10 +75,10 @@
//the scheduleFlush request would fail this time
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index 1711bc2..b213da0 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -21,8 +21,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -38,17 +38,17 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
@@ -62,11 +62,11 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush first component again
//this call should fail
@@ -75,10 +75,10 @@
//the scheduleFlush request would fail this time
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index bb33f3d..df26ef9 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -21,8 +21,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -38,17 +38,17 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
@@ -62,11 +62,11 @@
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMOperationType.FLUSH);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
//request to flush first component again
//this call should fail
@@ -75,10 +75,10 @@
//the scheduleFlush request would fail this time
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
} catch (Exception e) {
Assert.fail();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index cb81b64..cee20ce 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -34,8 +34,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
/**
@@ -225,7 +225,8 @@
for (int i = diskComponents.length - 1; i >= 0; i--) {
// start from the oldest component to the newest component
if (diskComponents[i] != null && diskComponents[i].getComponentSize() > 0) {
- secondaryIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, diskComponents[i]);
+ secondaryIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.FLUSH, null,
+ diskComponents[i]);
// setting component id has to be place between afterOperation and addBulkLoadedComponent,
// since afterOperation would set a flush component id (but it's not invalid)
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 6a092e3..7dc5939 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -50,7 +51,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
@@ -288,7 +288,7 @@
}
if (flushOnExit) {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
- cb.afterFinalize(LSMOperationType.FLUSH, null);
+ cb.afterFinalize(LSMIOOperationType.FLUSH, null);
}
for (ILSMDiskComponent c : diskComponents) {
c.deactivateAndPurge();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 9740631..c495b69 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -44,6 +44,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -53,7 +54,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
@@ -432,7 +432,7 @@
throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
}
if (flushOnExit) {
- ioOpCallback.afterFinalize(LSMOperationType.FLUSH, null);
+ ioOpCallback.afterFinalize(LSMIOOperationType.FLUSH, null);
}
// Even though, we deactivate the index, we don't exit components or
// modify any of the lists to make sure they
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index c2ae786..ff32628 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -29,9 +29,10 @@
/**
* Represents the io operation type
*/
- enum LSMIOOpertionType {
+ enum LSMIOOperationType {
FLUSH,
- MERGE
+ MERGE,
+ LOAD
}
/**
@@ -52,7 +53,7 @@
/**
* @return the operation type
*/
- LSMIOOpertionType getIOOpertionType();
+ LSMIOOperationType getIOOpertionType();
@Override
Boolean call() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index 0323026..e122fd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
public interface ILSMIOOperationCallback {
@@ -29,7 +30,7 @@
* (i.e. IO operations could be flush or merge operations.)
* For flush, this is called immediately before switching the current memory component pointer
*/
- void beforeOperation(LSMOperationType opType) throws HyracksDataException;
+ void beforeOperation(LSMIOOperationType opType) throws HyracksDataException;
/**
* This method is called on an IO operation sometime after the operation was completed.
@@ -42,7 +43,7 @@
* @param newComponent
* @throws HyracksDataException
*/
- void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent)
+ void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent)
throws HyracksDataException;
/**
@@ -53,7 +54,7 @@
* @param newComponent
* @throws HyracksDataException
*/
- void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException;
+ void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException;
/**
* This method is called when a memory component is recycled
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 396cb77..2b2fe0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -397,6 +398,7 @@
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
+ ioOpCallback.beforeOperation(LSMIOOperationType.LOAD);
return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 438bb0b..a439ace 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -31,7 +31,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
public class AsynchronousScheduler implements ILSMIOOperationScheduler {
@@ -60,7 +60,7 @@
super.afterExecute(r, t);
LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
ILSMIOOperation executedOp = task.getOperation();
- if (executedOp.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
+ if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
String id = executedOp.getIndexIdentifier();
synchronized (this) {
runningFlushOperations.remove(id);
@@ -84,9 +84,9 @@
@Override
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
- if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
+ if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
executor.submit(operation);
- } else if (operation.getIOOpertionType() == LSMIOOpertionType.FLUSH) {
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
if (runningFlushOperations.containsKey(id)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index 8d0395f..12dbb46 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -23,9 +23,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallback {
@@ -45,18 +45,18 @@
}
@Override
- public void beforeOperation(LSMOperationType opType) throws HyracksDataException {
+ public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
wrappedCallback.beforeOperation(opType);
}
@Override
- public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+ public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
wrappedCallback.afterOperation(opType, oldComponents, newComponent);
}
@Override
- public synchronized void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent)
+ public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
throws HyracksDataException {
wrappedCallback.afterFinalize(opType, newComponent);
notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 2f4dcc2..20f9f6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
@@ -106,7 +107,7 @@
// Check if there is any action that is needed to be taken based on the operation type
switch (opType) {
case MERGE:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE);
default:
break;
}
@@ -207,7 +208,7 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
- callback.afterFinalize(LSMOperationType.MERGE, null);
+ callback.afterFinalize(LSMIOOperationType.MERGE, null);
return;
}
lsmIndex.scheduleMerge(ctx, callback);
@@ -220,7 +221,7 @@
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
// If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
// whenever the current merge has finished, it will schedule the full merge again.
- callback.afterFinalize(LSMOperationType.MERGE, null);
+ callback.afterFinalize(LSMIOOperationType.MERGE, null);
return;
}
fullMergeIsRequested.set(false);
@@ -236,11 +237,11 @@
ILSMDiskComponent newComponent = null;
try {
newComponent = lsmIndex.merge(operation);
- operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
newComponent.markAsValid(lsmIndex.isDurable());
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
- operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
+ operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished the merge operation for index: " + lsmIndex);
@@ -300,7 +301,7 @@
@Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- callback.afterFinalize(LSMOperationType.FLUSH, null);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, null);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index 7b7f950..e809925 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -60,8 +60,8 @@
}
@Override
- public LSMIOOpertionType getIOOpertionType() {
- return LSMIOOpertionType.FLUSH;
+ public LSMIOOperationType getIOOpertionType() {
+ return LSMIOOperationType.FLUSH;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 00a29bc..48b6d8f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -44,6 +44,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -193,7 +194,7 @@
// Check if there is any action that is needed to be taken based on the operation type
switch (opType) {
case FLUSH:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH);
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.FLUSH);
// Changing the flush status should *always* precede changing the mutable component.
lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
lsmIndex.changeMutableComponent();
@@ -202,7 +203,7 @@
opTracker.notifyAll();
break;
case MERGE:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE);
break;
default:
break;
@@ -504,7 +505,7 @@
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
- callback.afterFinalize(LSMOperationType.FLUSH, null);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, null);
return;
}
lsmIndex.scheduleFlush(ctx, callback);
@@ -520,7 +521,7 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.flush(operation);
- operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
+ operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
failedOperation = true;
@@ -530,7 +531,7 @@
throw e;
} finally {
exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
- operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
+ operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished the flush operation for index: " + lsmIndex);
@@ -541,7 +542,7 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
- callback.afterFinalize(LSMOperationType.MERGE, null);
+ callback.afterFinalize(LSMIOOperationType.MERGE, null);
return;
}
lsmIndex.scheduleMerge(ctx, callback);
@@ -554,7 +555,7 @@
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
// If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
// whenever the current merge has finished, it will schedule the full merge again.
- callback.afterFinalize(LSMOperationType.MERGE, null);
+ callback.afterFinalize(LSMIOOperationType.MERGE, null);
return;
}
fullMergeIsRequested.set(false);
@@ -571,7 +572,7 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.merge(operation);
- operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) {
failedOperation = true;
@@ -596,7 +597,7 @@
// 8. completeOperation (decrement the numOfIoOperations)
opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, ctx.getSearchOperationCallback(),
ctx.getModificationCallback());
- operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
+ operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished the merge operation for index: " + lsmIndex);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 8befee1..08b8bb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -21,7 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
@@ -46,19 +46,28 @@
@Override
public void end() throws HyracksDataException {
- componentBulkLoader.end();
- if (component.getComponentSize() > 0) {
- //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
- //then after operation should be called from harness as well
- //https://issues.apache.org/jira/browse/ASTERIXDB-1764
- lsmIndex.getIOOperationCallback().afterOperation(LSMOperationType.FLUSH, null, component);
- lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
+ try {
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ //TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
+ //then after operation should be called from harness as well
+ //https://issues.apache.org/jira/browse/ASTERIXDB-1764
+ lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, component);
+ lsmIndex.getLsmHarness().addBulkLoadedComponent(component);
+ }
+ } finally {
+ lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, component);
}
}
@Override
public void abort() throws HyracksDataException {
- componentBulkLoader.abort();
+ try {
+ componentBulkLoader.abort();
+ lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, null);
+ } finally {
+ lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, null);
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
index c83d534..e16da5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MergeOperation.java
@@ -47,8 +47,8 @@
}
@Override
- public LSMIOOpertionType getIOOpertionType() {
- return LSMIOOpertionType.MERGE;
+ public LSMIOOperationType getIOOpertionType() {
+ return LSMIOOperationType.MERGE;
}
public IIndexCursor getCursor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index da8bc46..09ca553 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -23,11 +23,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
INSTANCE;
@@ -44,18 +44,19 @@
}
@Override
- public void beforeOperation(LSMOperationType opType) throws HyracksDataException {
+ public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
// Do nothing.
}
@Override
- public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+ public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
// Do nothing.
}
@Override
- public void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
+ throws HyracksDataException {
// Do nothing.
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
index 0f3c90f..238e915 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
@@ -23,9 +23,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
/**
* This class is for testing. It's basically a way to get the new/old component info from the
@@ -38,19 +38,19 @@
private ILSMDiskComponent newComponent = null;
@Override
- public void beforeOperation(LSMOperationType opType) throws HyracksDataException {
+ public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
// Not interested in this
}
@Override
- public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents,
+ public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
this.oldComponents = oldComponents;
this.newComponent = newComponent;
}
@Override
- public synchronized void afterFinalize(LSMOperationType opType, ILSMDiskComponent newComponent)
+ public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
throws HyracksDataException {
// Redundant info from after
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 88f3231..4cfc3b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -35,7 +35,7 @@
static final Logger LOGGER = Logger.getLogger(TracedIOOperation.class.getName());
protected final ILSMIOOperation ioOp;
- private final LSMIOOpertionType ioOpType;
+ private final LSMIOOperationType ioOpType;
private final ITracer tracer;
private final long traceCategory;
@@ -80,7 +80,7 @@
}
@Override
- public LSMIOOpertionType getIOOpertionType() {
+ public LSMIOOperationType getIOOpertionType() {
return ioOpType;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index 0f606ed..ca24b13 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -50,7 +51,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
@@ -328,7 +328,7 @@
if (flushOnExit) {
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
- cb.afterFinalize(LSMOperationType.FLUSH, null);
+ cb.afterFinalize(LSMIOOperationType.FLUSH, null);
}
for (ILSMDiskComponent c : diskComponents) {