Add Disk Component Scan operation for primary LSM index
-Added disk component scan operation for primary LSMBTree index,
which would be used by creating new secondary index
-This operation scans all disk components of the primary index,
and return all tuples. Thus, tuples with the same primary key
in different in components would be returned separately.
-The returned tuple has an extra int field, which indicates
which component this tuple comes from, and a boolean flag,
which indicates whether this tuple is an anti-matter tuple or not.
Change-Id: I31b2c67c58cb0a440c1d2c26400af322e2f1c1e5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1791
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 23031bc..3c70dba 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -89,6 +89,8 @@
public static final int CANNOT_CLEAR_INACTIVE_INDEX = 53;
public static final int CANNOT_ALLOCATE_MEMORY_FOR_INACTIVE_INDEX = 54;
public static final int RESOURCE_DOES_NOT_EXIST = 55;
+ public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
+ public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 88e4204..604b534 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -74,5 +74,6 @@
53 = Failed to clear the index since it is inactive
54 = Failed to allocate memory components for the index since it is inactive
55 = Resource does not exist for %1$s
-
+56 = LSM disk component scan is not allowed for a secondary index
+57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
index 3df27ad..bd18ea0 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
@@ -26,6 +26,9 @@
import org.apache.hyracks.data.std.api.IPointableFactory;
public final class BooleanPointable extends AbstractPointable implements IHashable, IComparable {
+
+ public static final BooleanPointableFactory FACTORY = new BooleanPointableFactory();
+
public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
private static final long serialVersionUID = 1L;
@@ -40,7 +43,7 @@
}
};
- public static final IPointableFactory FACTORY = new IPointableFactory() {
+ public static class BooleanPointableFactory implements IPointableFactory {
private static final long serialVersionUID = 1L;
@Override
@@ -48,11 +51,17 @@
return new BooleanPointable();
}
+ public IPointable createPointable(boolean value) {
+ BooleanPointable pointable = new BooleanPointable();
+ pointable.setBoolean(value);
+ return pointable;
+ }
+
@Override
public ITypeTraits getTypeTraits() {
return TYPE_TRAITS;
}
- };
+ }
public static boolean getBoolean(byte[] bytes, int start) {
return bytes[start] != 0;
@@ -67,6 +76,11 @@
}
public void setBoolean(boolean value) {
+ if (bytes == null) {
+ start = 0;
+ length = TYPE_TRAITS.getFixedLength();
+ bytes = new byte[length];
+ }
setBoolean(bytes, start, value);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 43e0889..71a3f71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -32,5 +32,6 @@
MERGE,
FULL_MERGE,
FLUSH,
- REPLICATE
+ REPLICATE,
+ DISK_COMPONENT_SCAN
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index b224e87..f5f908f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -286,6 +286,19 @@
}
@Override
+ public void scanDiskComponents(ILSMIndexOperationContext ictx, IIndexCursor cursor) throws HyracksDataException {
+ if (!isPrimaryIndex()) {
+ throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
+ }
+ LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx;
+ List<ILSMComponent> operationalComponents = ctx.getComponentHolder();
+ MultiComparator comp = MultiComparator.create(getComparatorFactories());
+ ISearchPredicate pred = new RangePredicate(null, null, true, true, comp, comp);
+ ctx.getSearchInitialState().reset(pred, operationalComponents);
+ ((LSMBTreeSearchCursor) cursor).scan(ctx.getSearchInitialState(), pred);
+ }
+
+ @Override
public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation;
LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
new file mode 100644
index 0000000..3c9f709
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree.impls;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
+import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.common.ICursorInitialState;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.MultiComparator;
+
+public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
+
+ private static final IValueReference MATTER_TUPLE_FLAG = BooleanPointable.FACTORY.createPointable(false);
+ private static final IValueReference ANTIMATTER_TUPLE_FLAG = BooleanPointable.FACTORY.createPointable(true);
+
+ private BTreeAccessor[] btreeAccessors;
+
+ private ArrayTupleBuilder tupleBuilder;
+ private ArrayTupleBuilder antiMatterTupleBuilder;
+ private final ArrayTupleReference outputTuple;
+ private PermutingTupleReference originalTuple;
+
+ private boolean foundNext;
+
+ private IntegerPointable cursorIndexPointable;
+
+ public LSMBTreeDiskComponentScanCursor(ILSMIndexOperationContext opCtx) {
+ super(opCtx, true);
+ this.outputTuple = new ArrayTupleReference();
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
+ cmp = lsmInitialState.getOriginalKeyComparator();
+ operationalComponents = lsmInitialState.getOperationalComponents();
+ lsmHarness = lsmInitialState.getLSMHarness();
+ includeMutableComponent = false;
+
+ int numBTrees = operationalComponents.size();
+ rangeCursors = new IIndexCursor[numBTrees];
+ btreeAccessors = new BTreeAccessor[numBTrees];
+ for (int i = 0; i < numBTrees; i++) {
+ ILSMComponent component = operationalComponents.get(i);
+ IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getLeafFrameFactory().createFrame();
+ rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
+ BTree btree = ((LSMBTreeDiskComponent) component).getBTree();
+
+ btreeAccessors[i] = (BTreeAccessor) btree.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ btreeAccessors[i].search(rangeCursors[i], searchPred);
+ }
+
+ cursorIndexPointable = new IntegerPointable();
+ int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
+ cursorIndexPointable.set(new byte[length], 0, length);
+
+ setPriorityQueueComparator();
+ initPriorityQueue();
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ foundNext = false;
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException {
+ if (foundNext) {
+ return true;
+ }
+ while (super.hasNext()) {
+ super.next();
+ LSMBTreeTupleReference diskTuple = (LSMBTreeTupleReference) super.getTuple();
+ if (diskTuple.isAntimatter()) {
+ setAntiMatterTuple(diskTuple, outputElement.getCursorIndex());
+ } else {
+ //matter tuple
+ setMatterTuple(diskTuple, outputElement.getCursorIndex());
+ }
+ foundNext = true;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB)
+ throws HyracksDataException {
+ // This method is used to check whether tupleA and tupleB (from different disk components) are identical.
+ // If so, the tuple from the older component is ignored by default.
+ // Here we use a simple trick so that tuples from different disk components are always not the same
+ // so that they would be returned by the cursor anyway.
+ return -1;
+ }
+
+ private void setMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException {
+ if (tupleBuilder == null) {
+ tupleBuilder = new ArrayTupleBuilder(diskTuple.getFieldCount() + 2);
+ antiMatterTupleBuilder = new ArrayTupleBuilder(diskTuple.getFieldCount() + 2);
+ int[] permutation = new int[diskTuple.getFieldCount()];
+ for (int i = 0; i < permutation.length; i++) {
+ permutation[i] = i + 2;
+ }
+ originalTuple = new PermutingTupleReference(permutation);
+ }
+
+ //build the matter tuple
+ buildTuple(tupleBuilder, diskTuple, cursorIndex, MATTER_TUPLE_FLAG);
+ outputTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ originalTuple.reset(outputTuple);
+ }
+
+ private void setAntiMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException {
+ if (originalTuple == null || cmp.compare(diskTuple, originalTuple) != 0) {
+ //This shouldn't happen, because we shouldn't place an anti-matter tuple
+ //into the primary index if that tuple is not there
+ throw HyracksDataException.create(ErrorCode.CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE);
+ }
+ buildTuple(antiMatterTupleBuilder, originalTuple, cursorIndex, ANTIMATTER_TUPLE_FLAG);
+ outputTuple.reset(antiMatterTupleBuilder.getFieldEndOffsets(), antiMatterTupleBuilder.getByteArray());
+ }
+
+ private void buildTuple(ArrayTupleBuilder builder, ITupleReference diskTuple, int cursorIndex,
+ IValueReference tupleFlag) throws HyracksDataException {
+ builder.reset();
+ cursorIndexPointable.setInteger(cursorIndex);
+ builder.addField(cursorIndexPointable);
+ builder.addField(tupleFlag);
+ for (int i = 0; i < diskTuple.getFieldCount(); i++) {
+ builder.addField(diskTuple.getFieldData(i), diskTuple.getFieldStart(i), diskTuple.getFieldLength(i));
+ }
+ }
+
+ @Override
+ public ITupleReference getTuple() {
+ return outputTuple;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (lsmHarness != null) {
+ try {
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ lsmHarness.endScanDiskComponents(opCtx);
+ }
+ }
+ foundNext = false;
+ }
+
+ @Override
+ protected void setPriorityQueueComparator() {
+ if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
+ pqCmp = new PriorityQueueScanComparator(cmp);
+ }
+ }
+
+ private class PriorityQueueScanComparator extends PriorityQueueComparator {
+ public PriorityQueueScanComparator(MultiComparator cmp) {
+ super(cmp);
+ }
+
+ @Override
+ public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+ int result;
+ try {
+ result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+ if (result != 0) {
+ return result;
+ }
+ } catch (HyracksDataException e) {
+ throw new IllegalArgumentException(e);
+ }
+ // the components in the component list are in descending order of creation time
+ // we want older components to be returned first
+ return elementA.getCursorIndex() > elementB.getCursorIndex() ? -1 : 1;
+
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
index c9bee31..fef8afe 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java
@@ -37,22 +37,30 @@
private final LSMBTreePointSearchCursor pointCursor;
private final LSMBTreeRangeSearchCursor rangeCursor;
+ private final LSMBTreeDiskComponentScanCursor scanCursor;
private ITreeIndexCursor currentCursor;
public LSMBTreeSearchCursor(ILSMIndexOperationContext opCtx) {
pointCursor = new LSMBTreePointSearchCursor(opCtx);
rangeCursor = new LSMBTreeRangeSearchCursor(opCtx);
+ scanCursor = new LSMBTreeDiskComponentScanCursor(opCtx);
}
@Override
public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
RangePredicate btreePred = (RangePredicate) searchPred;
+
currentCursor =
btreePred.isPointPredicate(lsmInitialState.getOriginalKeyComparator()) ? pointCursor : rangeCursor;
currentCursor.open(lsmInitialState, searchPred);
}
+ public void scan(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ currentCursor = scanCursor;
+ currentCursor.open(initialState, searchPred);
+ }
+
@Override
public boolean hasNext() throws HyracksDataException {
return currentCursor.hasNext();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 0fe7e04..c0a3f2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -81,6 +81,25 @@
void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
/**
+ * Scan all disk components of the index
+ *
+ * @param ctx
+ * the search operation context
+ * @param cursor
+ * the index cursor
+ * @throws HyracksDataException
+ */
+ void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;
+
+ /**
+ * End the scan
+ *
+ * @param ctx
+ * @throws HyracksDataException
+ */
+ void endScanDiskComponents(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+ /**
* Schedule a merge
*
* @param ctx
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 5b3872c..c7032ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -65,6 +65,8 @@
void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
+ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;
+
void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index ad53e73..1042df2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
/**
* Client handle for performing operations
@@ -230,4 +231,18 @@
* @throws HyracksDataException
*/
void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
+
+ /**
+ * Open the given cursor for scanning all disk components of the primary index.
+ * The returned tuple has the format of [(int) disk_component_position, (boolean) anti-matter flag,
+ * primary key, payload].
+ * The returned tuples are first ordered on primary key, and then ordered on the descending order of
+ * disk_component_position (older components get returned first)
+ *
+ * @param icursor
+ * Cursor over the index entries satisfying searchPred.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ */
+ void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
index b083770..63d2697 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
@@ -24,5 +24,6 @@
FORCE_MODIFICATION,
FLUSH,
MERGE,
- REPLICATE
+ REPLICATE,
+ DISK_COMPONENT_SCAN
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 9e5a230..64b8fec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -47,6 +47,7 @@
case MODIFICATION:
case REPLICATE:
case SEARCH:
+ case DISK_COMPONENT_SCAN:
readerCount++;
break;
case MERGE:
@@ -80,6 +81,7 @@
case MODIFICATION:
case REPLICATE:
case SEARCH:
+ case DISK_COMPONENT_SCAN:
readerCount--;
if (readerCount == 0 && state == ComponentState.READABLE_MERGING) {
state = ComponentState.INACTIVE;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 4ee9769..2be2184 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -324,12 +325,20 @@
case REPLICATE:
operationalComponents.addAll(ctx.getComponentsToBeReplicated());
break;
+ case DISK_COMPONENT_SCAN:
+ operationalComponents.addAll(immutableComponents);
+ break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
}
@Override
+ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
+ throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
+ }
+
+ @Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 1502706..50eac67 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -26,6 +26,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -454,6 +455,33 @@
}
@Override
+ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
+ if (!lsmIndex.isPrimaryIndex()) {
+ throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
+ }
+ LSMOperationType opType = LSMOperationType.DISK_COMPONENT_SCAN;
+ getAndEnterComponents(ctx, opType, false);
+ try {
+ ctx.getSearchOperationCallback().before(null);
+ lsmIndex.scanDiskComponents(ctx, cursor);
+ } catch (Exception e) {
+ exitComponents(ctx, opType, null, true);
+ throw e;
+ }
+ }
+
+ @Override
+ public void endScanDiskComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ if (ctx.getOperation() == IndexOperation.DISK_COMPONENT_SCAN) {
+ try {
+ exitComponents(ctx, LSMOperationType.DISK_COMPONENT_SCAN, null, false);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ @Override
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 20d71e2..b9714a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -29,13 +29,13 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -213,4 +213,10 @@
IFrameOperationCallback frameOpCallback) throws HyracksDataException {
lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
}
+
+ @Override
+ public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException {
+ ctx.setOperation(IndexOperation.DISK_COMPONENT_SCAN);
+ lsmHarness.scanDiskComponents(ctx, cursor);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index fb5cc9f..7751116 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -202,4 +203,10 @@
public void forceUpsert(ITupleReference tuple) throws HyracksDataException {
throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
}
+
+ @Override
+ public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException {
+ throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
+
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
new file mode 100644
index 0000000..3ba53dd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.btree;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.util.Random;
+
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.storage.am.btree.OrderedIndexTestContext;
+import org.apache.hyracks.storage.am.btree.OrderedIndexTestDriver;
+import org.apache.hyracks.storage.am.btree.OrderedIndexTestUtils;
+import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext;
+import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+@SuppressWarnings("rawtypes")
+public class LSMBTreeScanDiskComponentsTest extends OrderedIndexTestDriver {
+
+ private final OrderedIndexTestUtils orderedIndexTestUtils;
+
+ private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness();
+
+ public LSMBTreeScanDiskComponentsTest() {
+ super(LSMBTreeTestHarness.LEAF_FRAMES_TO_TEST);
+ this.orderedIndexTestUtils = new OrderedIndexTestUtils();
+
+ }
+
+ @Before
+ public void setUp() throws HyracksDataException {
+ harness.setUp();
+ }
+
+ @After
+ public void tearDown() throws HyracksDataException {
+ harness.tearDown();
+ }
+
+ @Override
+ protected OrderedIndexTestContext createTestContext(ISerializerDeserializer[] fieldSerdes, int numKeys,
+ BTreeLeafFrameType leafType, boolean filtered) throws Exception {
+ return LSMBTreeTestContext.create(harness.getIOManager(), harness.getVirtualBufferCaches(),
+ harness.getFileReference(), harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes,
+ numKeys, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
+ harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallback(),
+ harness.getMetadataPageManagerFactory(), false);
+ }
+
+ @Override
+ protected Random getRandom() {
+ return harness.getRandom();
+ }
+
+ @Override
+ protected void runTest(ISerializerDeserializer[] fieldSerdes, int numKeys, BTreeLeafFrameType leafType,
+ ITupleReference lowKey, ITupleReference highKey, ITupleReference prefixLowKey,
+ ITupleReference prefixHighKey) throws Exception {
+ OrderedIndexTestContext ctx = createTestContext(fieldSerdes, numKeys, leafType, false);
+ ctx.getIndex().create();
+ ctx.getIndex().activate();
+ // We assume all fieldSerdes are of the same type. Check the first one
+ // to determine which field types to generate.
+ if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) {
+ test(ctx, fieldSerdes);
+ } else if (fieldSerdes[0] instanceof UTF8StringSerializerDeserializer) {
+ test(ctx, fieldSerdes);
+ }
+
+ ctx.getIndex().validate();
+ ctx.getIndex().deactivate();
+ ctx.getIndex().destroy();
+ }
+
+ protected void test(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes)
+ throws HyracksDataException {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
+
+ //component 2 contains 1 and 2
+ upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes));
+ upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+
+ //component 1 contains 1 and -2
+ upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes));
+ deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+
+ //component 0 contains 2 and 3
+ upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes));
+ upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes));
+ accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback());
+
+ LSMBTree btree = (LSMBTree) ctx.getIndex();
+ Assert.assertEquals("Check disk components", 3, btree.getImmutableComponents().size());
+
+ IIndexCursor cursor = accessor.createSearchCursor(false);
+ accessor.scanDiskComponents(cursor);
+
+ ITupleReference tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 2, false, getValue(1, fieldSerdes));
+
+ tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 1, false, getValue(1, fieldSerdes));
+
+ tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 2, false, getValue(2, fieldSerdes));
+
+ tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 1, true, getValue(2, fieldSerdes));
+
+ tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 0, false, getValue(2, fieldSerdes));
+
+ tuple = getNext(cursor);
+ checkReturnedTuple(ctx, tuple, fieldSerdes, 0, false, getValue(3, fieldSerdes));
+
+ Assert.assertFalse(cursor.hasNext());
+ }
+
+ protected void checkReturnedTuple(OrderedIndexTestContext ctx, ITupleReference tuple,
+ ISerializerDeserializer[] fieldSerdes, int componentPos, boolean antimatter, Object value)
+ throws HyracksDataException {
+ int actualComponentPos = IntegerPointable.getInteger(tuple.getFieldData(0), tuple.getFieldStart(0));
+ Assert.assertEquals("Check returned component position", componentPos, actualComponentPos);
+
+ boolean actualAntiMatter = BooleanPointable.getBoolean(tuple.getFieldData(1), tuple.getFieldStart(1));
+ Assert.assertEquals("Check returned anti-matter flag", antimatter, actualAntiMatter);
+
+ int[] permutation = new int[ctx.getFieldCount()];
+ for (int i = 0; i < permutation.length; i++) {
+ permutation[i] = i + 2;
+ }
+
+ PermutingTupleReference originalTuple = new PermutingTupleReference(permutation);
+ originalTuple.reset(tuple);
+
+ for (int i = 0; i < fieldSerdes.length; i++) {
+ ByteArrayInputStream inStream = new ByteArrayInputStream(originalTuple.getFieldData(i),
+ originalTuple.getFieldStart(i), originalTuple.getFieldLength(i));
+ DataInput dataIn = new DataInputStream(inStream);
+ Object actualObj = fieldSerdes[i].deserialize(dataIn);
+ if (!actualObj.equals(value)) {
+ fail("Actual and expected fields do not match on field " + i + ".\nExpected: " + value + "\nActual : "
+ + actualObj);
+ }
+ }
+
+ }
+
+ protected ITupleReference getNext(IIndexCursor cursor) throws HyracksDataException {
+ Assert.assertTrue(cursor.hasNext());
+ cursor.next();
+ return cursor.getTuple();
+ }
+
+ @Override
+ protected String getTestOpName() {
+ return "Disk Components Scan";
+ }
+
+ protected void upsertTuple(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes, Object value)
+ throws HyracksDataException {
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(ctx.getFieldCount());
+ for (int i = 0; i < ctx.getFieldCount(); i++) {
+ tupleBuilder.addField(fieldSerdes[i], value);
+ }
+ ArrayTupleReference tuple = new ArrayTupleReference();
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ try {
+ ctx.getIndexAccessor().upsert(tuple);
+ } catch (HyracksDataException e) {
+ if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+ throw e;
+ }
+ }
+ }
+
+ protected void deleteTuple(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes, Object value)
+ throws HyracksDataException {
+ ArrayTupleBuilder deletedBuilder = new ArrayTupleBuilder(ctx.getKeyFieldCount());
+ for (int i = 0; i < ctx.getKeyFieldCount(); i++) {
+ deletedBuilder.addField(fieldSerdes[i], value);
+ }
+ ArrayTupleReference deleteTuple = new ArrayTupleReference();
+ deleteTuple.reset(deletedBuilder.getFieldEndOffsets(), deletedBuilder.getByteArray());
+ try {
+ ctx.getIndexAccessor().delete(deleteTuple);
+ } catch (HyracksDataException e) {
+ if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) {
+ throw e;
+ }
+ }
+ }
+
+ protected Object getValue(Object value, ISerializerDeserializer[] fieldSerdes) {
+ if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) {
+ return value;
+ } else if (fieldSerdes[0] instanceof UTF8StringSerializerDeserializer) {
+ return String.valueOf(value);
+ } else {
+ return null;
+ }
+ }
+}