[ASTERIXDB-2161] Fix component id manage lifecycle
- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of LMSIOOperationCallback
is changed
Details:
- The current way of management component ids is not correct,
in presence of that multiple partitions sharing the same primary op
tracker. It's possible when a partition is empty/being flushed,
the next flush is scheduled by another partition, which
will disturb the partition. This patch fixes this by
using the same logic of maintaining flushed LSNs to maintain
component id.
- Extend recycle memory component interface to indicate whether it
switches the new component or not.
- Also fixes [ASTERIXDB-2168] to ensure we do not miss latest flushed
LSNs by advancing io callback before finishing flush
Change-Id: Ifc35184c4d431db9af71cab302439e165ee55f54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2153
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 70436b5..a239210 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -135,10 +135,11 @@
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
int partition = 0;
- dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
- NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
+ dataset =
+ new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
IndexDataflowHelperFactory iHelperFactory =
@@ -201,12 +202,17 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -249,6 +255,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
@@ -273,6 +282,9 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
@@ -320,6 +332,9 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(
c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
@@ -338,6 +353,9 @@
secondSearcher.waitUntilEntered();
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
+
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
// now that the rollback has completed, we will unblock the search
@@ -745,6 +763,8 @@
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
+ dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ ((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
lsmAccessor.deleteComponents(predicate);
} catch (HyracksDataException e) {
failure = e;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 44967e3..ddcb5b5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -98,8 +97,7 @@
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
- throws HyracksDataException {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
super.afterFinalize(opType, newComponent);
synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
if (newComponent != null) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index fe64d3b..8002895 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -362,9 +362,8 @@
for (IndexInfo iInfo : dsr.getIndexes().values()) {
AbstractLSMIOOperationCallback ioCallback =
(AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- if (!(iInfo.getIndex().isCurrentMutableComponentEmpty()
- || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated()
- || opTracker.isFlushOnExit())) {
+ if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+ || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
long firstLSN = ioCallback.getFirstLSN();
if (firstLSN < targetLSN) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -387,7 +386,15 @@
* This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
- if (!dsInfo.isExternal() && dsInfo.isDurable()) {
+ if (dsInfo.isExternal()) {
+ // no memory components for external dataset
+ return;
+ }
+
+ ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
+ idGenerator.refresh();
+
+ if (dsInfo.isDurable()) {
synchronized (logRecord) {
TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null, logManager.getNodeId(),
dsInfo.getIndexes().size());
@@ -404,16 +411,14 @@
throw new HyracksDataException(e);
}
}
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- //update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- ioOpCallback.updateLastLSN(logRecord.getLSN());
- }
}
- ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
- idGenerator.refresh();
+ for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback =
+ (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+ }
if (asyncFlush) {
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index c33e2d1..1432f25 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -34,7 +34,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
-import org.apache.hyracks.storage.am.lsm.common.impls.EmptyComponent;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -54,6 +53,12 @@
protected int readIndex;
// Index of the currently being written to component
protected int writeIndex;
+ // Index of the memory component to be recycled
+ protected int recycleIndex;
+ // Indicates whether this index has been scheduled to flush (no matter whether succeeds or not)
+ protected boolean hasFlushed;
+ // Keep track of the component Id of the next component being activated.
+ protected ILSMComponentId[] nextComponentIds;
protected final ILSMComponentIdGenerator idGenerator;
@@ -66,6 +71,12 @@
flushRequested = new boolean[count];
readIndex = 0;
writeIndex = 0;
+ recycleIndex = 0;
+ hasFlushed = false;
+ nextComponentIds = new ILSMComponentId[count];
+ if (count > 0) {
+ nextComponentIds[0] = idGenerator.getId();
+ }
}
@Override
@@ -84,33 +95,50 @@
if (writeIndex != readIndex) {
firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
}
+
}
}
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
- // The operation was complete and the next I/O operation for the LSM index didn't start yet
- if (opType == LSMIOOperationType.FLUSH && newComponent != null) {
- synchronized (this) {
- flushRequested[readIndex] = false;
- // if the component which just finished flushing is the component that will be modified next,
- // we set its first LSN to its previous LSN
- if (readIndex == writeIndex) {
- firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+ public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
+ ILSMDiskComponent newComponent) throws HyracksDataException {
+ //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
+ if (newComponent != null) {
+ putLSNIntoMetadata(newComponent, oldComponents);
+ putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+ if (opType == LSMIOOperationType.MERGE) {
+ // In case of merge, oldComponents are never null
+ LongPointable markerLsn =
+ LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
+ ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
+ newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+ } else if (opType == LSMIOOperationType.FLUSH) {
+ // advance memory component indexes
+ synchronized (this) {
+ // we've already consumed the specified LSN/component id.
+ // Now we can advance to the next component
+ flushRequested[readIndex] = false;
+ // if the component which just finished flushing is the component that will be modified next,
+ // we set its first LSN to its previous LSN
+ if (readIndex == writeIndex) {
+ firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+ }
+ readIndex = (readIndex + 1) % mutableLastLSNs.length;
}
- readIndex = (readIndex + 1) % mutableLastLSNs.length;
- }
- if (newComponent == EmptyComponent.INSTANCE) {
- // This component was just deleted, we refresh the component id, when it gets recycled, it will get
- // the new id from the component id generator.
- // It is assumed that the component delete caller will ensure that corresponding components in secondary
- // indexes are deleted as well
- idGenerator.refresh();
}
}
}
+ @Override
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+ // The operation was complete and the next I/O operation for the LSM index didn't start yet
+ if (opType == LSMIOOperationType.FLUSH) {
+ hasFlushed = true;
+ }
+
+ }
+
public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
throws HyracksDataException {
newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
@@ -148,6 +176,13 @@
//Moreover, since the memory component is already being flushed, the next scheduleFlush request must fail.
//See https://issues.apache.org/jira/browse/ASTERIXDB-1917
mutableLastLSNs[writeIndex] = lastLSN;
+ if (hasFlushed || lsmIndex.isMemoryComponentsAllocated()) {
+ // we only (re)set next component id if either this index has been flushed (no matter succeed or not)
+ // or the memory component has been allocated
+ // This prevents the case where indexes in a partition are being allocated, while another partition
+ // tries to schedule flush
+ nextComponentIds[writeIndex] = idGenerator.getId();
+ }
}
}
@@ -164,7 +199,6 @@
}
public synchronized boolean hasPendingFlush() {
-
for (int i = 0; i < flushRequested.length; i++) {
if (flushRequested[i]) {
return true;
@@ -173,23 +207,6 @@
return false;
}
- @Override
- public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
- ILSMDiskComponent newComponent) throws HyracksDataException {
- //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
- if (newComponent != null) {
- putLSNIntoMetadata(newComponent, oldComponents);
- putComponentIdIntoMetadata(opType, newComponent, oldComponents);
- if (opType == LSMIOOperationType.MERGE) {
- // In case of merge, oldComponents are never null
- LongPointable markerLsn =
- LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
- ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
- newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
- }
- }
- }
-
public long getComponentLSN(List<? extends ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation. --> moves the flush pointer
@@ -208,14 +225,26 @@
return maxLSN;
}
+ private synchronized ILSMComponentId getLSMComponentId() {
+ return nextComponentIds[recycleIndex];
+ }
+
@Override
- public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
- component.resetId(idGenerator.getId());
+ public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException {
+ ILSMComponentId componentId = getLSMComponentId();
+ component.resetId(componentId);
+ if (componentSwitched) {
+ recycleIndex = (recycleIndex + 1) % nextComponentIds.length;
+ }
}
@Override
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
- component.resetId(idGenerator.getId());
+ if (component == lsmIndex.getCurrentMemoryComponent()) {
+ // only set the component id for the first (current) memory component
+ ILSMComponentId componentId = getLSMComponentId();
+ component.resetId(componentId);
+ }
}
/**
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
new file mode 100644
index 0000000..0f2ea50
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.ioopcallbacks;
+
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import junit.framework.TestCase;
+
+public abstract class AbstractLSMIOOperationCallbackTest extends TestCase {
+
+ @Test
+ public void testNormalSequence() throws HyracksDataException {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+ //request to flush first component
+ callback.updateLastLSN(1);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ //request to flush second component
+ callback.updateLastLSN(2);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ Assert.assertEquals(1, callback.getComponentLSN(null));
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+ Assert.assertEquals(2, callback.getComponentLSN(null));
+
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ }
+
+ @Test
+ public void testOverWrittenLSN() throws HyracksDataException {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+ //request to flush first component
+ callback.updateLastLSN(1);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ //request to flush second component
+ callback.updateLastLSN(2);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ //request to flush first component again
+ //this call should fail
+ callback.updateLastLSN(3);
+ //there is no corresponding beforeOperation, since the first component is being flush
+ //the scheduleFlush request would fail this time
+
+ Assert.assertEquals(1, callback.getComponentLSN(null));
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+ Assert.assertEquals(2, callback.getComponentLSN(null));
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ }
+
+ @Test
+ public void testLostLSN() throws HyracksDataException {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+
+ //request to flush first component
+ callback.updateLastLSN(1);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ //request to flush second component
+ callback.updateLastLSN(2);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+
+ Assert.assertEquals(1, callback.getComponentLSN(null));
+
+ // the first flush is finished, but has not finalized yet (in codebase, these two calls
+ // are not synchronized)
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+
+ //request to flush first component again
+ callback.updateLastLSN(3);
+
+ // the first flush is finalized (it may be called after afterOperation for a while)
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+ // the second flush gets LSN 2
+ Assert.assertEquals(2, callback.getComponentLSN(null));
+ // the second flush is finished
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+
+ // it should get new LSN 3
+ Assert.assertEquals(3, callback.getComponentLSN(null));
+ }
+
+ @Test
+ public void testAllocateComponentId() throws HyracksDataException {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+ ILSMComponentId initialId = idGenerator.getId();
+ // simulate a partition is flushed before allocated
+ idGenerator.refresh();
+ callback.updateLastLSN(0);
+
+ callback.allocated(mockComponent);
+ checkMemoryComponent(initialId, mockComponent);
+ }
+
+ @Test
+ public void testRecycleComponentId() throws HyracksDataException {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+ ILSMComponentId id = idGenerator.getId();
+ callback.allocated(mockComponent);
+ checkMemoryComponent(id, mockComponent);
+
+ Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+ for (int i = 0; i < 100; i++) {
+ // schedule a flush
+ idGenerator.refresh();
+ ILSMComponentId expectedId = idGenerator.getId();
+
+ callback.updateLastLSN(0);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+ callback.recycled(mockComponent, true);
+
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
+ callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ checkMemoryComponent(expectedId, mockComponent);
+ }
+ }
+
+ @Test
+ public void testRecycleWithoutSwitch() throws HyracksDataException {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+ ILSMComponentId id = idGenerator.getId();
+ callback.allocated(mockComponent);
+ checkMemoryComponent(id, mockComponent);
+
+ Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+
+ for (int i = 0; i < 10; i++) {
+ idGenerator.refresh();
+ id = idGenerator.getId();
+ callback.updateLastLSN(0);
+ callback.recycled(mockComponent, false);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, null);
+ checkMemoryComponent(id, mockComponent);
+ }
+ }
+
+ @Test
+ public void testConcurrentRecycleComponentId() throws HyracksDataException {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
+ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+
+ ILSMComponentId id = idGenerator.getId();
+ callback.allocated(mockComponent);
+ checkMemoryComponent(id, mockComponent);
+
+ Mockito.when(mockIndex.isMemoryComponentsAllocated()).thenReturn(true);
+
+ // schedule a flush
+ idGenerator.refresh();
+ ILSMComponentId expectedId = idGenerator.getId();
+
+ callback.updateLastLSN(0);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+
+ // another flush is to be scheduled before the component is recycled
+ idGenerator.refresh();
+ ILSMComponentId nextId = idGenerator.getId();
+
+ // recycle the component
+ callback.recycled(mockComponent, true);
+ checkMemoryComponent(expectedId, mockComponent);
+
+ // schedule the next flush
+ callback.updateLastLSN(0);
+ callback.beforeOperation(LSMIOOperationType.FLUSH);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ callback.recycled(mockComponent, true);
+ checkMemoryComponent(nextId, mockComponent);
+ }
+
+ private void checkMemoryComponent(ILSMComponentId expected, ILSMMemoryComponent memoryComponent)
+ throws HyracksDataException {
+ ArgumentCaptor<ILSMComponentId> argument = ArgumentCaptor.forClass(ILSMComponentId.class);
+ Mockito.verify(memoryComponent).resetId(argument.capture());
+ assertEquals(expected, argument.getValue());
+
+ Mockito.reset(memoryComponent);
+ }
+
+ private ILSMDiskComponent mockDiskComponent() {
+ ILSMDiskComponent component = Mockito.mock(ILSMDiskComponent.class);
+ Mockito.when(component.getMetadata()).thenReturn(Mockito.mock(DiskComponentMetadata.class));
+ return component;
+ }
+
+ protected abstract AbstractLSMIOOperationCallback getIoCallback();
+}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index f467ee8..c22e2e3 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
package org.apache.asterix.test.ioopcallbacks;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
import org.mockito.Mockito;
-import junit.framework.TestCase;
+public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
-public class LSMBTreeIOOperationCallbackTest extends TestCase {
-
- public void testNormalSequence() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeIOOperationCallback callback =
- new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- public void testOverWrittenLSN() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeIOOperationCallback callback =
- new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush first component again
- //this call should fail
- callback.updateLastLSN(3);
- //there is no corresponding beforeOperation, since the first component is being flush
- //the scheduleFlush request would fail this time
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
+ @Override
+ protected AbstractLSMIOOperationCallback getIoCallback() {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 63c46f7..356c80a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
package org.apache.asterix.test.ioopcallbacks;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
import org.mockito.Mockito;
-import junit.framework.TestCase;
+public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
-public class LSMBTreeWithBuddyIOOperationCallbackTest extends TestCase {
-
- public void testNormalSequence() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeWithBuddyIOOperationCallback callback =
- new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- public void testOverWrittenLSN() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeWithBuddyIOOperationCallback callback =
- new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush first component again
- //this call should fail
- callback.updateLastLSN(3);
- //there is no corresponding beforeOperation, since the first component is being flush
- //the scheduleFlush request would fail this time
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
+ @Override
+ protected AbstractLSMIOOperationCallback getIoCallback() {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index 1e961d8..ac4595e 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
package org.apache.asterix.test.ioopcallbacks;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
import org.mockito.Mockito;
-import junit.framework.TestCase;
+public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
-public class LSMInvertedIndexIOOperationCallbackTest extends TestCase {
-
- public void testNormalSequence() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMInvertedIndexIOOperationCallback callback =
- new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- public void testOverWrittenLSN() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMInvertedIndexIOOperationCallback callback =
- new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush first component again
- //this call should fail
- callback.updateLastLSN(3);
- //there is no corresponding beforeOperation, since the first component is being flush
- //the scheduleFlush request would fail this time
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
+ @Override
+ protected AbstractLSMIOOperationCallback getIoCallback() {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index 618f2a3..0131e3f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -19,72 +19,19 @@
package org.apache.asterix.test.ioopcallbacks;
+import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
-import org.junit.Assert;
import org.mockito.Mockito;
-import junit.framework.TestCase;
+public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
-public class LSMRTreeIOOperationCallbackTest extends TestCase {
-
- public void testNormalSequence() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMRTreeIOOperationCallback callback =
- new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- public void testOverWrittenLSN() {
- try {
- ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
- Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMRTreeIOOperationCallback callback =
- new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
- //request to flush first component
- callback.updateLastLSN(1);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush second component
- callback.updateLastLSN(2);
- callback.beforeOperation(LSMIOOperationType.FLUSH);
-
- //request to flush first component again
- //this call should fail
- callback.updateLastLSN(3);
- //there is no corresponding beforeOperation, since the first component is being flush
- //the scheduleFlush request would fail this time
-
- Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
-
- Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
- } catch (Exception e) {
- Assert.fail();
- }
+ @Override
+ protected AbstractLSMIOOperationCallback getIoCallback() {
+ ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
+ Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+ return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index e122fd4..7bb12c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -60,8 +60,9 @@
* This method is called when a memory component is recycled
*
* @param component
+ * @param componentSwitched
*/
- void recycled(ILSMMemoryComponent component) throws HyracksDataException;
+ void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException;
/**
* This method is called when a memory component is allocated
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 17dadcb..15cf8e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -61,7 +61,7 @@
if (state == ComponentState.INACTIVE && requestedToBeActive) {
state = ComponentState.READABLE_WRITABLE;
requestedToBeActive = false;
- lsmIndex.getIOOperationCallback().recycled(this);
+ lsmIndex.getIOOperationCallback().recycled(this, true);
}
switch (opType) {
case FORCE_MODIFICATION:
@@ -278,7 +278,7 @@
if (this.componentId != null && !componentId.missing() // for backward compatibility
&& this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
throw new IllegalStateException(
- "LSM memory component receives illegal id. Old id " + this.componentId + ", new id " + componentId);
+ this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.log(Level.INFO, "Component Id was reset from " + this.componentId + " to " + componentId);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index 12dbb46..e464231 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -64,8 +64,8 @@
}
@Override
- public void recycled(ILSMMemoryComponent component) throws HyracksDataException {
- wrappedCallback.recycled(component);
+ public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException {
+ wrappedCallback.recycled(component, componentSwitched);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e1d5114..393aa6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -175,7 +175,7 @@
// Call recycled only when we change it's state is reset back to READABLE_WRITABLE
// Otherwise, if the component is in other state, e.g., INACTIVE, or
// READABLE_UNWRITABLE_FLUSHING, it's not considered as being recycled here.
- lsmIndex.getIOOperationCallback().recycled(flushingComponent);
+ lsmIndex.getIOOperationCallback().recycled(flushingComponent, false);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 21d10d7..3a58c19 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -67,7 +67,7 @@
}
@Override
- public void recycled(ILSMMemoryComponent component) {
+ public void recycled(ILSMMemoryComponent component, boolean componentSwitched) {
// Do nothing.
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
index 238e915..88def5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/StubIOOperationCallback.java
@@ -64,7 +64,7 @@
}
@Override
- public void recycled(ILSMMemoryComponent component) {
+ public void recycled(ILSMMemoryComponent component, boolean componentSwitched) {
// Not interested in this
}