[NO ISSUE][STO] Improve the LSMIOOperationCallback interface.
- user model changes: no
- storage format changes: no
- interface changes: yes
+ ILSMIndexOperationContext.getIoOperationType()
+ ILSMIndexOperationContext.getNewComponent()
* before, after, and finalize
calls of ILSMIOOperationCallback now take
ILSMIndexOperationContext as a parameter
Details:
- Before, some calls to ILSMIOOperationCallback
take just an enum LSMIOOperationType, some of them
take an enum and a component object. These sometimes don't
provide enough information to different implementations of
the callback that might be interested in more than that.
- Having the operation context object passed allow for
better exchange of information between different callers
and callees throughout the IO operation.
Change-Id: Ib7120c40a1a2256ed528dfd2e5853db9dba247c6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2455
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index c69ffe5..5852ad9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -18,20 +18,17 @@
*/
package org.apache.asterix.test.dataflow;
-import java.util.List;
-
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
@@ -108,42 +105,40 @@
}
@Override
- public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
+ public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
lsmBtree.beforeIoOperationCalled();
- super.beforeOperation(opType);
+ super.beforeOperation(opCtx);
lsmBtree.beforeIoOperationReturned();
}
@Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
lsmBtree.afterIoOperationCalled();
- super.afterOperation(opType, oldComponents, newComponent);
+ super.afterOperation(opCtx);
lsmBtree.afterIoOperationReturned();
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
- throws HyracksDataException {
+ public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException {
lsmBtree.afterIoFinalizeCalled();
- super.afterFinalize(opType, newComponent);
+ super.afterFinalize(opCtx);
synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
- if (newComponent != null) {
- if (newComponent == EmptyComponent.INSTANCE) {
- if (opType == LSMIOOperationType.FLUSH) {
+ if (opCtx.getNewComponent() != null) {
+ if (opCtx.getNewComponent() == EmptyComponent.INSTANCE) {
+ if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
rollbackFlushes++;
} else {
rollbackMerges++;
}
} else {
- if (opType == LSMIOOperationType.FLUSH) {
+ if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
completedFlushes++;
} else {
completedMerges++;
}
}
} else {
- recordFailure(opType);
+ recordFailure(opCtx.getIoOperationType());
}
TestLsmBtreeIoOpCallbackFactory.this.notifyAll();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 412981c..b9f0cc7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
@@ -90,8 +91,8 @@
}
@Override
- public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
- if (opType == LSMIOOperationType.FLUSH) {
+ public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
/*
* This method was called on the scheduleFlush operation.
* We set the lastLSN to the last LSN for the index (the LSN for the flush log)
@@ -111,25 +112,25 @@
}
@Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
//TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
- if (newComponent == null) {
+ if (opCtx.getNewComponent() == null) {
// failed operation. Nothing to do.
return;
}
- putLSNIntoMetadata(newComponent, oldComponents);
- putComponentIdIntoMetadata(opType, newComponent, oldComponents);
- componentLsnMap.put(newComponent.getId(), getComponentLSN(oldComponents));
- if (opType == LSMIOOperationType.MERGE) {
- if (oldComponents == null) {
+ putLSNIntoMetadata(opCtx.getNewComponent(), opCtx.getComponentsToBeMerged());
+ putComponentIdIntoMetadata(opCtx.getIoOperationType(), opCtx.getNewComponent(),
+ opCtx.getComponentsToBeMerged());
+ componentLsnMap.put(opCtx.getNewComponent().getId(), getComponentLSN(opCtx.getComponentsToBeMerged()));
+ if (opCtx.getIoOperationType() == LSMIOOperationType.MERGE) {
+ if (opCtx.getComponentsToBeMerged().isEmpty()) {
throw new IllegalStateException("Merge must have old components");
}
- LongPointable markerLsn =
- LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
+ LongPointable markerLsn = LongPointable.FACTORY
+ .createPointable(ComponentUtils.getLong(opCtx.getComponentsToBeMerged().get(0).getMetadata(),
ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
- newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
- } else if (opType == LSMIOOperationType.FLUSH) {
+ opCtx.getNewComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+ } else if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
// advance memory component indexes
synchronized (this) {
// we've already consumed the specified LSN/component id.
@@ -146,17 +147,18 @@
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
+ public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// The operation was complete and the next I/O operation for the LSM index didn't start yet
- if (opType == LSMIOOperationType.FLUSH) {
+ if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
hasFlushed = true;
- if (newComponent != null) {
- final Long lsn = componentLsnMap.remove(newComponent.getId());
+ if (opCtx.getNewComponent() != null) {
+ final Long lsn = componentLsnMap.remove(opCtx.getNewComponent().getId());
if (lsn == null) {
- throw new IllegalStateException("Unidentified flushed component: " + newComponent);
+ throw new IllegalStateException("Unidentified flushed component: " + opCtx.getNewComponent());
}
// empty component doesn't have any files
- final Optional<String> componentFile = newComponent.getLSMComponentPhysicalFiles().stream().findAny();
+ final Optional<String> componentFile =
+ opCtx.getNewComponent().getLSMComponentPhysicalFiles().stream().findAny();
if (componentFile.isPresent()) {
final ResourceReference ref = ResourceReference.of(componentFile.get());
final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
@@ -166,7 +168,7 @@
}
}
- private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
+ private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<? extends ILSMComponent> oldComponents)
throws HyracksDataException {
newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
}
@@ -178,8 +180,9 @@
return pointable.getLength() == 0 ? INVALID : pointable.longValue();
}
- private ILSMComponentId getMergedComponentId(List<ILSMComponent> mergedComponents) throws HyracksDataException {
- if (mergedComponents == null || mergedComponents.isEmpty()) {
+ private ILSMComponentId getMergedComponentId(List<? extends ILSMComponent> mergedComponents)
+ throws HyracksDataException {
+ if (mergedComponents.isEmpty()) {
return null;
}
return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
@@ -188,7 +191,7 @@
}
private void putComponentIdIntoMetadata(LSMIOOperationType opType, ILSMDiskComponent newComponent,
- List<ILSMComponent> oldComponents) throws HyracksDataException {
+ List<? extends ILSMComponent> oldComponents) throws HyracksDataException {
// the id of flushed component is set when we copy the metadata of the memory component
if (opType == LSMIOOperationType.MERGE) {
ILSMComponentId componentId = getMergedComponentId(oldComponents);
@@ -242,7 +245,7 @@
}
public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
- if (diskComponents == null) {
+ if (diskComponents.isEmpty()) {
// Implies a flush IO operation. --> moves the flush pointer
// Flush operation of an LSM index are executed sequentially.
synchronized (this) {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 2ab5b4e..c03af40 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -19,6 +19,8 @@
package org.apache.asterix.test.ioopcallbacks;
+import java.util.Collections;
+
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -29,6 +31,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
@@ -49,24 +52,30 @@
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
mockIndexCheckpointManagerProvider());
+ ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex);
//request to flush first component
callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(firstOpCtx);
+ ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex);
//request to flush second component
callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(secondOpCtx);
- Assert.assertEquals(1, callback.getComponentLSN(null));
+ Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList()));
final ILSMDiskComponent diskComponent1 = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
+ firstOpCtx.setNewComponent(diskComponent1);
+ callback.afterOperation(firstOpCtx);
+ callback.afterFinalize(firstOpCtx);
- Assert.assertEquals(2, callback.getComponentLSN(null));
+ Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList()));
final ILSMDiskComponent diskComponent2 = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
+ secondOpCtx.setNewComponent(diskComponent2);
+ callback.afterOperation(secondOpCtx);
+ callback.afterFinalize(secondOpCtx);
}
@Test
@@ -74,17 +83,20 @@
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
-
LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
mockIndexCheckpointManagerProvider());
//request to flush first component
+ ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex);
callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(firstOpCtx);
//request to flush second component
+ ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex);
callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(secondOpCtx);
//request to flush first component again
//this call should fail
@@ -92,14 +104,16 @@
//there is no corresponding beforeOperation, since the first component is being flush
//the scheduleFlush request would fail this time
- Assert.assertEquals(1, callback.getComponentLSN(null));
+ Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList()));
final ILSMDiskComponent diskComponent1 = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
+ firstOpCtx.setNewComponent(diskComponent1);
+ callback.afterOperation(firstOpCtx);
+ callback.afterFinalize(firstOpCtx);
final ILSMDiskComponent diskComponent2 = mockDiskComponent();
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
+ secondOpCtx.setNewComponent(diskComponent2);
+ Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList()));
+ callback.afterOperation(secondOpCtx);
+ callback.afterFinalize(secondOpCtx);
}
@Test
@@ -111,33 +125,39 @@
LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
mockIndexCheckpointManagerProvider());
//request to flush first component
+ ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex);
callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(firstOpCtx);
//request to flush second component
+ ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex);
callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(secondOpCtx);
- Assert.assertEquals(1, callback.getComponentLSN(null));
+ Assert.assertEquals(1, callback.getComponentLSN(Collections.emptyList()));
// the first flush is finished, but has not finalized yet (in codebase, these two calls
// are not synchronized)
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ firstOpCtx.setNewComponent(mockDiskComponent());
+ callback.afterOperation(firstOpCtx);
//request to flush first component again
callback.updateLastLSN(3);
// the first flush is finalized (it may be called after afterOperation for a while)
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ callback.afterFinalize(firstOpCtx);
// the second flush gets LSN 2
- Assert.assertEquals(2, callback.getComponentLSN(null));
+ Assert.assertEquals(2, callback.getComponentLSN(Collections.emptyList()));
// the second flush is finished
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ secondOpCtx.setNewComponent(mockDiskComponent());
+ callback.afterOperation(secondOpCtx);
+ callback.afterFinalize(secondOpCtx);
// it should get new LSN 3
- Assert.assertEquals(3, callback.getComponentLSN(null));
+ Assert.assertEquals(3, callback.getComponentLSN(Collections.emptyList()));
}
@Test
@@ -179,14 +199,14 @@
// schedule a flush
idGenerator.refresh();
ILSMComponentId expectedId = idGenerator.getId();
-
callback.updateLastLSN(0);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
+ ILSMIndexOperationContext opCtx = new TestLSMIndexOperationContext(mockIndex);
+ opCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(opCtx);
callback.recycled(mockComponent, true);
-
- final ILSMDiskComponent diskComponent = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
+ opCtx.setNewComponent(mockDiskComponent());
+ callback.afterOperation(opCtx);
+ callback.afterFinalize(opCtx);
checkMemoryComponent(expectedId, mockComponent);
}
}
@@ -200,19 +220,19 @@
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMBTreeIOOperationCallback callback =
new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
-
ILSMComponentId id = idGenerator.getId();
callback.allocated(mockComponent);
checkMemoryComponent(id, mockComponent);
-
Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
-
for (int i = 0; i < 10; i++) {
idGenerator.refresh();
id = idGenerator.getId();
callback.updateLastLSN(0);
+ // Huh! There is no beforeOperation?
+ ILSMIndexOperationContext opCtx = new TestLSMIndexOperationContext(mockIndex);
+ opCtx.setIoOperationType(LSMIOOperationType.FLUSH);
callback.recycled(mockComponent, false);
- callback.afterFinalize(LSMIOOperationType.FLUSH, null);
+ callback.afterFinalize(opCtx);
checkMemoryComponent(id, mockComponent);
}
}
@@ -238,10 +258,12 @@
ILSMComponentId expectedId = idGenerator.getId();
callback.updateLastLSN(0);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
- final ILSMDiskComponent diskComponent = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
+ ILSMIndexOperationContext firstOpCtx = new TestLSMIndexOperationContext(mockIndex);
+ firstOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(firstOpCtx);
+ firstOpCtx.setNewComponent(mockDiskComponent());
+ callback.afterOperation(firstOpCtx);
+ callback.afterFinalize(firstOpCtx);
// another flush is to be scheduled before the component is recycled
idGenerator.refresh();
@@ -253,10 +275,12 @@
// schedule the next flush
callback.updateLastLSN(0);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
- final ILSMDiskComponent diskComponent2 = mockDiskComponent();
- callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
- callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
+ ILSMIndexOperationContext secondOpCtx = new TestLSMIndexOperationContext(mockIndex);
+ secondOpCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.beforeOperation(secondOpCtx);
+ secondOpCtx.setNewComponent(mockDiskComponent());
+ callback.afterOperation(secondOpCtx);
+ callback.afterFinalize(secondOpCtx);
callback.recycled(mockComponent, true);
checkMemoryComponent(nextId, mockComponent);
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java
new file mode 100644
index 0000000..19536f6
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.ioopcallbacks;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class TestLSMIndexOperationContext implements ILSMIndexOperationContext {
+
+ private final ILSMIndex index;
+ private final List<ILSMComponent> componentHolder = new ArrayList<>();
+ private final List<ILSMDiskComponent> componentsToBeMerged = new ArrayList<>();
+ private final List<ILSMDiskComponent> componentsToBeReplicated = new ArrayList<>();
+ private boolean isAccessingComponents;
+ private IndexOperation op;
+ private LSMIOOperationType ioOperationType;
+ private ILSMDiskComponent newComponent;
+
+ public TestLSMIndexOperationContext(ILSMIndex index) {
+ this.index = index;
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) throws HyracksDataException {
+ this.op = newOp;
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ @Override
+ public void reset() {
+ op = null;
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ isAccessingComponents = false;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public List<ILSMDiskComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return NoOpOperationCallback.INSTANCE;
+ }
+
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return NoOpOperationCallback.INSTANCE;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ILSMDiskComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+
+ @Override
+ public boolean isAccessingComponents() {
+ return isAccessingComponents;
+ }
+
+ @Override
+ public void setAccessingComponents(boolean accessingComponents) {
+ this.isAccessingComponents = accessingComponents;
+ }
+
+ @Override
+ public PermutingTupleReference getIndexTuple() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PermutingTupleReference getFilterTuple() {
+ return null;
+ }
+
+ @Override
+ public MultiComparator getFilterCmp() {
+ return null;
+ }
+
+ @Override
+ public ILSMIndex getIndex() {
+ return index;
+ }
+
+ @Override
+ public void logPerformanceCounters(int tupleCount) {
+ }
+
+ @Override
+ public void incrementEnterExitTime(long increment) {
+ }
+
+ @Override
+ public boolean isTracingEnabled() {
+ return false;
+ }
+
+ @Override
+ public LSMIOOperationType getIoOperationType() {
+ return ioOperationType;
+ }
+
+ @Override
+ public void setIoOperationType(LSMIOOperationType ioOpType) {
+ this.ioOperationType = ioOpType;
+ }
+
+ @Override
+ public ILSMDiskComponent getNewComponent() {
+ return newComponent;
+ }
+
+ @Override
+ public void setNewComponent(ILSMDiskComponent component) {
+ this.newComponent = component;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 92d74d9..c0f7571 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -50,7 +50,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -288,8 +287,9 @@
throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
}
if (flushOnExit) {
- BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
- cb.afterFinalize(LSMIOOperationType.FLUSH, null);
+ ExternalBTreeOpContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, version);
+ opCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ ioOpCallback.afterFinalize(opCtx);
}
for (ILSMDiskComponent c : diskComponents) {
c.deactivateAndPurge();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 6e06d37..1ba55f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent;
@@ -430,7 +431,9 @@
throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
}
if (flushOnExit) {
- ioOpCallback.afterFinalize(LSMIOOperationType.FLUSH, null);
+ AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE);
+ opCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ ioOpCallback.afterFinalize(opCtx);
}
// Even though, we deactivate the index, we don't exit components or
// modify any of the lists to make sure they
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index f5ee23b..65e7f64 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -33,7 +33,8 @@
enum LSMIOOperationType {
FLUSH,
MERGE,
- LOAD
+ LOAD,
+ NOOP
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index 8df872b..acc9e89 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -18,19 +18,16 @@
*/
package org.apache.hyracks.storage.am.lsm.common.api;
-import java.util.List;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
public interface ILSMIOOperationCallback {
/**
* This method is called on an IO operation before the operation starts.
- * (i.e. IO operations could be flush or merge operations.)
+ * (i.e. IO operations could be flush, or merge operations.)
* For flush, this is called immediately before switching the current memory component pointer
*/
- void beforeOperation(LSMIOOperationType opType) throws HyracksDataException;
+ void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException;
/**
* This method is called on an IO operation sometime after the operation was completed.
@@ -39,22 +36,18 @@
* Copying content of metadata page from memory component to disk component should be done in this call
* Merging content of metadata pages from disk components to new disk component should be done in this call
*
- * @param oldComponents
- * @param newComponent
* @throws HyracksDataException
*/
- void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents, ILSMDiskComponent newComponent)
- throws HyracksDataException;
+ void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException;
/**
* This method is called on an IO operation when the operation needs any cleanup works
* regardless that the IO operation was executed or not. Once the IO operation is executed,
* this method should be called after ILSMIOOperationCallback.afterOperation() was called.
*
- * @param newComponent
* @throws HyracksDataException
*/
- void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException;
+ void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException;
/**
* This method is called when a memory component is recycled
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index ec9124d..79b3262 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -82,4 +83,28 @@
* @return true if performance tracing is enabled, false otherwise
*/
boolean isTracingEnabled();
+
+ /**
+ * @return the IO Operation type associated with this context
+ */
+ LSMIOOperationType getIoOperationType();
+
+ /**
+ * Set the IO Operation type associated with this context
+ *
+ * @param ioOpType
+ */
+ void setIoOperationType(LSMIOOperationType ioOpType);
+
+ /**
+ * @return the new component produced by this operation if any, null otherwise
+ */
+ ILSMDiskComponent getNewComponent();
+
+ /**
+ * Set the new component produced by this operation
+ *
+ * @param component
+ */
+ void setNewComponent(ILSMDiskComponent component);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index fef5516..0368a09 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -369,6 +369,7 @@
AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE);
opCtx.setOperation(ctx.getOperation());
opCtx.getComponentHolder().addAll(mergingComponents);
+ opCtx.getComponentsToBeMerged().addAll(ctx.getComponentsToBeMerged());
ILSMDiskComponent firstComponent = (ILSMDiskComponent) mergingComponents.get(0);
ILSMDiskComponent lastComponent = (ILSMDiskComponent) mergingComponents.get(mergingComponents.size() - 1);
LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent);
@@ -406,8 +407,10 @@
public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
throws HyracksDataException {
- ioOpCallback.beforeOperation(LSMIOOperationType.LOAD);
- return new LSMIndexDiskComponentBulkLoader(this, fillLevel, verifyInput, numElementsHint);
+ AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE);
+ opCtx.setIoOperationType(LSMIOOperationType.LOAD);
+ ioOpCallback.beforeOperation(opCtx);
+ return new LSMIndexDiskComponentBulkLoader(this, opCtx, fillLevel, verifyInput, numElementsHint);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
index 1b540b7..72c2b07 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -54,6 +55,8 @@
protected final ITracer tracer;
protected final long traceCategory;
private long enterExitTime = 0L;
+ private LSMIOOperationType ioOpType = LSMIOOperationType.NOOP;
+ private ILSMDiskComponent newDiskComponent;
public AbstractLSMIndexOperationContext(ILSMIndex index, int[] treeFields, int[] filterFields,
IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback,
@@ -191,4 +194,24 @@
public ILSMIndex getIndex() {
return index;
}
+
+ @Override
+ public LSMIOOperationType getIoOperationType() {
+ return ioOpType;
+ }
+
+ @Override
+ public void setIoOperationType(LSMIOOperationType ioOpType) {
+ this.ioOpType = ioOpType;
+ }
+
+ @Override
+ public ILSMDiskComponent getNewComponent() {
+ return newDiskComponent;
+ }
+
+ @Override
+ public void setNewComponent(ILSMDiskComponent component) {
+ this.newDiskComponent = component;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index e464231..042720c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -18,13 +18,9 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import java.util.List;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallback {
@@ -45,20 +41,18 @@
}
@Override
- public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
- wrappedCallback.beforeOperation(opType);
+ public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ wrappedCallback.beforeOperation(opCtx);
}
@Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
- wrappedCallback.afterOperation(opType, oldComponents, newComponent);
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ wrappedCallback.afterOperation(opCtx);
}
@Override
- public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
- throws HyracksDataException {
- wrappedCallback.afterFinalize(opType, newComponent);
+ public synchronized void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ wrappedCallback.afterFinalize(opCtx);
notifyAll();
notified = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index dcac219..a992c5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -218,4 +219,24 @@
public void destroy() throws HyracksDataException {
// No Op.. Nothing to destroy
}
+
+ @Override
+ public LSMIOOperationType getIoOperationType() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setIoOperationType(LSMIOOperationType ioOpType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ILSMDiskComponent getNewComponent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setNewComponent(ILSMDiskComponent component) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index d9d3a07..aa54127 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -107,7 +107,8 @@
// Check if there is any action that is needed to be taken based on the operation type
switch (opType) {
case MERGE:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ lsmIndex.getIOOperationCallback().beforeOperation(ctx);
default:
break;
}
@@ -208,7 +209,8 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
- callback.afterFinalize(LSMIOOperationType.MERGE, null);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ callback.afterFinalize(ctx);
return;
}
lsmIndex.scheduleMerge(ctx, callback);
@@ -221,7 +223,8 @@
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
// If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
// whenever the current merge has finished, it will schedule the full merge again.
- callback.afterFinalize(LSMIOOperationType.MERGE, null);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ callback.afterFinalize(ctx);
return;
}
fullMergeIsRequested.set(false);
@@ -237,11 +240,13 @@
ILSMDiskComponent newComponent = null;
try {
newComponent = lsmIndex.merge(operation);
- operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ ctx.setNewComponent(newComponent);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ operation.getCallback().afterOperation(ctx);
newComponent.markAsValid(lsmIndex.isDurable());
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
- operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
+ operation.getCallback().afterFinalize(ctx);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Finished the merge operation for index: " + lsmIndex);
@@ -301,7 +306,8 @@
@Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
- callback.afterFinalize(LSMIOOperationType.FLUSH, null);
+ ctx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.afterFinalize(ctx);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index fa3093c..eed8f6e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -221,7 +221,8 @@
// Check if there is any action that is needed to be taken based on the operation type
switch (opType) {
case FLUSH:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.FLUSH);
+ ctx.setIoOperationType(LSMIOOperationType.FLUSH);
+ lsmIndex.getIOOperationCallback().beforeOperation(ctx);
// Changing the flush status should *always* precede changing the mutable component.
lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
lsmIndex.changeMutableComponent();
@@ -230,7 +231,8 @@
opTracker.notifyAll(); // NOSONAR: Always called from a synchronized block
break;
case MERGE:
- lsmIndex.getIOOperationCallback().beforeOperation(LSMIOOperationType.MERGE);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ lsmIndex.getIOOperationCallback().beforeOperation(ctx);
break;
default:
break;
@@ -549,7 +551,8 @@
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
- callback.afterFinalize(LSMIOOperationType.FLUSH, null);
+ ctx.setIoOperationType(LSMIOOperationType.FLUSH);
+ callback.afterFinalize(ctx);
return;
}
lsmIndex.scheduleFlush(ctx, callback);
@@ -565,7 +568,9 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.flush(operation);
- operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
+ ctx.setNewComponent(newComponent);
+ ctx.setIoOperationType(LSMIOOperationType.FLUSH);
+ operation.getCallback().afterOperation(ctx);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) { // NOSONAR Log and re-throw
failedOperation = true;
@@ -575,7 +580,8 @@
throw e;
} finally {
exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
- operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+ ctx.setIoOperationType(LSMIOOperationType.FLUSH);
+ operation.getCallback().afterFinalize(ctx);
}
} finally {
@@ -595,7 +601,8 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
- callback.afterFinalize(LSMIOOperationType.MERGE, null);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ callback.afterFinalize(ctx);
return;
}
lsmIndex.scheduleMerge(ctx, callback);
@@ -609,7 +616,8 @@
// If the merge cannot be scheduled because there is already an ongoing merge on
// subset/all of the components, then whenever the current merge has finished,
// it will schedule the full merge again.
- callback.afterFinalize(LSMIOOperationType.MERGE, null);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ callback.afterFinalize(ctx);
return;
}
fullMergeIsRequested.set(false);
@@ -626,8 +634,9 @@
boolean failedOperation = false;
try {
newComponent = lsmIndex.merge(operation);
- operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(),
- newComponent);
+ ctx.setNewComponent(newComponent);
+ ctx.setIoOperationType(LSMIOOperationType.MERGE);
+ operation.getCallback().afterOperation(ctx);
newComponent.markAsValid(lsmIndex.isDurable());
} catch (Throwable e) { // NOSONAR: Log and re-throw
failedOperation = true;
@@ -637,7 +646,7 @@
throw e;
} finally {
exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
- operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
+ operation.getCallback().afterFinalize(ctx);
}
} finally {
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 7bc0660..5e105a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,26 +22,27 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
private final AbstractLSMIndex lsmIndex;
- private final ILSMDiskComponent component;
private final ILSMDiskComponentBulkLoader componentBulkLoader;
+ private ILSMIndexOperationContext opCtx;
- public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, float fillFactor, boolean verifyInput,
- long numElementsHint) throws HyracksDataException {
+ public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor,
+ boolean verifyInput, long numElementsHint) throws HyracksDataException {
this.lsmIndex = lsmIndex;
+ this.opCtx = opCtx;
// Note that by using a flush target file name, we state that the
// new bulk loaded component is "newer" than any other merged component.
- this.component = lsmIndex.createBulkLoadTarget();
+ opCtx.setNewComponent(lsmIndex.createBulkLoadTarget());
this.componentBulkLoader =
- component.createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
+ opCtx.getNewComponent().createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true, true);
}
public ILSMDiskComponent getComponent() {
- return component;
+ return opCtx.getNewComponent();
}
@Override
@@ -57,15 +58,15 @@
public void end() throws HyracksDataException {
try {
componentBulkLoader.end();
- if (component.getComponentSize() > 0) {
+ if (opCtx.getNewComponent().getComponentSize() > 0) {
//TODO(amoudi): Ensure Bulk load follow the same lifecycle Other Operations (Flush, Merge, etc).
//then after operation should be called from harness as well
//https://issues.apache.org/jira/browse/ASTERIXDB-1764
- lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, component);
- lsmIndex.getHarness().addBulkLoadedComponent(component);
+ lsmIndex.getIOOperationCallback().afterOperation(opCtx);
+ lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getNewComponent());
}
} finally {
- lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, component);
+ lsmIndex.getIOOperationCallback().afterFinalize(opCtx);
}
}
@@ -73,9 +74,10 @@
public void abort() throws HyracksDataException {
try {
componentBulkLoader.abort();
- lsmIndex.getIOOperationCallback().afterOperation(LSMIOOperationType.LOAD, null, null);
+ opCtx.setNewComponent(null);
+ lsmIndex.getIOOperationCallback().afterOperation(opCtx);
} finally {
- lsmIndex.getIOOperationCallback().afterFinalize(LSMIOOperationType.LOAD, null);
+ lsmIndex.getIOOperationCallback().afterFinalize(opCtx);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index eec2dca..3432624 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -18,16 +18,12 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
-import java.util.List;
-
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.common.IResource;
@@ -51,19 +47,17 @@
}
@Override
- public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
+ public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// Do nothing.
}
@Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// Do nothing.
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
- throws HyracksDataException {
+ public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// Do nothing.
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
index 88def5e..2c16be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
@@ -21,10 +21,9 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
/**
@@ -34,33 +33,29 @@
public class StubIOOperationCallback implements ILSMIOOperationCallback {
- private List<ILSMComponent> oldComponents = null;
- private ILSMDiskComponent newComponent = null;
+ private ILSMIndexOperationContext opCtx = null;
@Override
- public void beforeOperation(LSMIOOperationType opType) throws HyracksDataException {
+ public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// Not interested in this
}
@Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
- this.oldComponents = oldComponents;
- this.newComponent = newComponent;
+ public void afterOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ this.opCtx = opCtx;
}
@Override
- public synchronized void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
- throws HyracksDataException {
+ public void afterFinalize(ILSMIndexOperationContext opCtx) throws HyracksDataException {
// Redundant info from after
}
- public List<ILSMComponent> getLastOldComponents() {
- return oldComponents;
+ public List<ILSMDiskComponent> getLastOldComponents() {
+ return opCtx.getComponentsToBeMerged();
}
public ILSMDiskComponent getLastNewComponent() {
- return newComponent;
+ return opCtx.getNewComponent();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f29bffc..7b12250 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -50,7 +50,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
@@ -323,10 +323,10 @@
if (!isActive) {
throw new HyracksDataException("Failed to deactivate the index since it is already deactivated.");
}
-
if (flushOnExit) {
- BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
- cb.afterFinalize(LSMIOOperationType.FLUSH, null);
+ AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpOperationCallback.INSTANCE, version);
+ opCtx.setIoOperationType(LSMIOOperationType.FLUSH);
+ ioOpCallback.afterFinalize(opCtx);
}
for (ILSMDiskComponent c : diskComponents) {