Merge branch 'gerrit/neo'
Change-Id: I493e78c38f1a41557fdb4b960b58dbdc20b62104
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f40628fb..0b80881 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -52,6 +52,7 @@
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -66,6 +67,8 @@
import org.apache.asterix.transaction.management.service.logging.LogReader;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
@@ -813,10 +816,10 @@
RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
filterFields == null ? 0 : filterFields.length, recordType, metaType);
// fix pk fields
- int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+ int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length);
int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
- pkFieldsInCommitOp[i] = diff + i;
+ pkFieldsInCommitOp[i] = start++;
}
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
@@ -827,19 +830,26 @@
private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
ARecordType itemType, ARecordType metaItemType) throws Exception {
- ITypeTraits[] outputTypeTraits =
- new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
- + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ // 1 boolean field at the beginning to indicate whether the operation was upsert or delete
+ int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount();
+ ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields];
+ ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields];
- // add the previous record first
+ ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
+ ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
int f = 0;
- outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ // add the upsert indicator boolean field
+ outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8);
+ outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8);
+ f++;
+ // add the previous record
+ outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType);
+ outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType);
f++;
// add the previous meta second
if (dataset.hasMetaPart()) {
- outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
- outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType);
+ outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType);
f++;
}
// add the previous filter third
@@ -854,10 +864,8 @@
}
}
fieldIdx = i;
- outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
- .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
- outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
- .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
f++;
}
for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 24379a3..c5292d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.dataflow;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -45,6 +44,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -53,18 +53,28 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.MultiComparator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -84,7 +94,7 @@
private static final boolean[] UNIQUE_META_FIELDS = null;
private static final int[] KEY_INDEXES = { 0 };
private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
- private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final List<Integer> KEY_INDICATORS_LIST = List.of(Index.RECORD_INDICATOR);
private static final int TOTAL_NUM_OF_RECORDS = 2000;
private static final int RECORDS_PER_COMPONENT = 1000;
private static final int DATASET_ID = 101;
@@ -102,6 +112,7 @@
private static IIndexDataflowHelper indexDataflowHelper;
private static ITransactionContext txnCtx;
private static LSMPrimaryInsertOperatorNodePushable insertOp;
+ private static LSMPrimaryUpsertOperatorNodePushable upsertOp;
@BeforeClass
public static void setUp() throws Exception {
@@ -143,6 +154,8 @@
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager, null, null).getLeft();
+ upsertOp = nc.getUpsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager, null, false).getLeft();
}
@After
@@ -202,6 +215,63 @@
}
@Test
+ public void testCursorSwitchSucceedWithNoDuplicates() {
+ try {
+ StorageTestUtils.allowAllOps(lsmBtree);
+ lsmBtree.clearSearchCallbacks();
+ RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+ KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ int totalNumRecords = LSMIndexSearchCursor.SWITCH_COMPONENT_CYCLE + 2;
+ ITupleReference[] upsertTuples = new ITupleReference[totalNumRecords];
+ for (int j = 0; j < totalNumRecords; j++) {
+ ITupleReference tuple = tupleGenerator.next();
+ upsertTuples[j] = TupleUtils.copyTuple(tuple);
+ }
+
+ // upsert and flush the tuples to create a disk component
+ upsert(tupleAppender, totalNumRecords, upsertTuples, true);
+ // upsert but don't flush the tuples to create a memory component
+ upsert(tupleAppender, totalNumRecords, upsertTuples, false);
+
+ // do the search operation
+ ILSMIndexAccessor accessor = new LSMTreeIndexAccessor(lsmBtree.getHarness(),
+ lsmBtree.createOpContext(NoOpIndexAccessParameters.INSTANCE), LSMBTreeSearchCursor::new);
+ IIndexCursor searchCursor = accessor.createSearchCursor(false);
+ MultiComparator lowKeySearchCmp =
+ BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+ MultiComparator highKeySearchCmp =
+ BTreeUtils.getSearchMultiComparator(lsmBtree.getComparatorFactories(), null);
+ RangePredicate rangePredicate =
+ new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp, null, null);
+
+ accessor.search(searchCursor, rangePredicate);
+
+ int count = 0;
+ while (searchCursor.hasNext()) {
+ searchCursor.next();
+ count++;
+ // flush the memory component to disk so that we make the switch to it when we hit the switch cycle
+ if (count == 1) {
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ }
+ }
+
+ Throwable failure = ResourceReleaseUtils.close(searchCursor, null);
+ failure = CleanupUtils.destroy(failure, searchCursor, accessor);
+ Assert.assertEquals("Records count not matching", totalNumRecords, count);
+ if (failure != null) {
+ Assert.fail(failure.getMessage());
+ }
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testCursorSwitchFails() {
try {
// allow all operations
@@ -268,4 +338,17 @@
emptyTupleOp.close();
Assert.assertEquals(numOfRecords, countOp.getCount());
}
+
+ private void upsert(FrameTupleAppender tupleAppender, int totalNumRecords, ITupleReference[] upsertTuples,
+ boolean flush) throws Exception {
+ upsertOp.open();
+ for (int j = 0; j < totalNumRecords; j++) {
+ DataflowUtils.addTupleToFrame(tupleAppender, upsertTuples[j], upsertOp);
+ }
+ tupleAppender.write(upsertOp, true);
+ if (flush) {
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ }
+ upsertOp.close();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 514682b..968416c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -246,10 +246,15 @@
rangeCursors[i].close();
btreeAccessors[i].reset(btree, iap);
btreeAccessors[i].search(rangeCursors[i], reusablePred);
- pushIntoQueueFromCursorAndReplaceThisElement(switchedElements[i]);
+ // consume the element that we restarted the search at since before the switch it was consumed
+ if (rangeCursors[i].hasNext()) {
+ rangeCursors[i].next();
+ switchedElements[i].reset(rangeCursors[i].getTuple());
+ }
}
}
switchRequest[i] = false;
+ switchedElements[i] = null;
// any failed switch makes further switches pointless
switchPossible = switchPossible && operationalComponents.get(i).getType() == LSMComponentType.DISK;
}
@@ -274,14 +279,18 @@
if (replaceFrom < 0) {
replaceFrom = i;
}
- // we return the outputElement to the priority queue if it came from this component
+
+ PriorityQueueElement element;
if (outputElement != null && outputElement.getCursorIndex() == i) {
- pushIntoQueueFromCursorAndReplaceThisElement(outputElement);
- needPushElementIntoQueue = false;
- outputElement = null;
- canCallProceed = true;
+ // there should be no element from this cursor in the queue since the element was polled
+ if (findElement(outputPriorityQueue, i) != null) {
+ throw new IllegalStateException("found element in the queue from the cursor of output element");
+ }
+ element = outputElement;
+ } else {
+ element = findElement(outputPriorityQueue, i);
}
- PriorityQueueElement element = remove(outputPriorityQueue, i);
+
// if this cursor is still active (has an element)
// then we copy the search key to restart the operation after
// replacing the component
@@ -341,6 +350,18 @@
return null;
}
+ private PriorityQueueElement findElement(PriorityQueue<PriorityQueueElement> outputPriorityQueue, int cursorIndex) {
+ // Scans the PQ for the component's element
+ Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
+ while (it.hasNext()) {
+ PriorityQueueElement e = it.next();
+ if (e.getCursorIndex() == cursorIndex) {
+ return e;
+ }
+ }
+ return null;
+ }
+
@Override
public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index 70f6d9e..5b0363a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -193,7 +193,7 @@
for (int i = 0; i < count; i++) {
ILSMComponent removed = ctx.getComponentHolder().remove(swapIndexes[i]);
if (removed.getType() == LSMComponentType.MEMORY) {
- LOGGER.info("Removed a memory component from the search operation");
+ LOGGER.debug("Removed memory component {} from the search operation", removed);
} else {
throw new IllegalStateException("Disk components can't be removed from the search operation");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index b6f6e26..27875c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -40,7 +40,7 @@
import org.apache.hyracks.storage.common.MultiComparator;
public abstract class LSMIndexSearchCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
- protected static final int SWITCH_COMPONENT_CYCLE = 100;
+ public static final int SWITCH_COMPONENT_CYCLE = 100;
protected final ILSMIndexOperationContext opCtx;
protected final boolean returnDeletedTuples;
protected PriorityQueueElement outputElement;
@@ -119,6 +119,7 @@
needPushElementIntoQueue = false;
for (int i = 0; i < switchRequest.length; i++) {
switchRequest[i] = false;
+ switchedElements[i] = null;
}
try {
if (outputPriorityQueue != null) {