Unexpose enter and exit components
Change-Id: Ib4ef7b432bbe6ac9cf2bbe9244cfe2b406f4cb93
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1778
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 40635c8..0a11684 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -92,15 +93,16 @@
private IFrameOperationCallback frameOpCallback;
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private AbstractIndexModificationOperationCallback abstractModCallback;
- private final boolean hasSecondaries;
private final ISearchOperationCallbackFactory searchCallbackFactory;
+ private final IFrameTupleProcessor processor;
+ private LSMTreeIndexAccessor lsmAccessor;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, ARecordType recordType,
int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
- IMissingWriterFactory missingWriterFactory, boolean hasSecondaries) throws HyracksDataException {
+ IMissingWriterFactory missingWriterFactory, final boolean hasSecondaries) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, null);
this.key = new PermutingFrameTupleReference();
@@ -125,7 +127,61 @@
this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
- this.hasSecondaries = hasSecondaries;
+ processor = new IFrameTupleProcessor() {
+ @Override
+ public void process(ITupleReference tuple, int index) throws HyracksDataException {
+ try {
+ tb.reset();
+ boolean recordWasInserted = false;
+ boolean recordWasDeleted = false;
+ resetSearchPredicate(index);
+ if (isFiltered || hasSecondaries) {
+ lsmAccessor.search(cursor, searchPred);
+ if (cursor.hasNext()) {
+ cursor.next();
+ prevTuple = cursor.getTuple();
+ cursor.reset(); // end the search
+ appendFilterToPrevTuple();
+ appendPrevRecord();
+ appendPreviousMeta();
+ appendFilterToOutput();
+ } else {
+ appendPreviousTupleAsMissing();
+ }
+ } else {
+ searchCallback.before(key); // lock
+ appendPreviousTupleAsMissing();
+ }
+ if (isDeleteOperation(tuple, numOfPrimaryKeys)) {
+ // Only delete if it is a delete and not upsert
+ abstractModCallback.setOp(Operation.DELETE);
+ lsmAccessor.forceDelete(tuple);
+ recordWasDeleted = true;
+ } else {
+ abstractModCallback.setOp(Operation.UPSERT);
+ lsmAccessor.forceUpsert(tuple);
+ recordWasInserted = true;
+ }
+ if (isFiltered && prevTuple != null) {
+ // need to update the filter of the new component with the previous value
+ lsmAccessor.updateFilter(prevTuple);
+ }
+ writeOutput(index, recordWasInserted, recordWasDeleted);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void start() throws HyracksDataException {
+ lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
+ }
+ };
}
// we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -163,15 +219,24 @@
searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
indexAccessor = index.createAccessor(abstractModCallback, searchCallback);
-
+ lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
INcApplicationContext appCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
- frameOpCallback =
- frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
+ frameOpCallback = new IFrameOperationCallback() {
+ IFrameOperationCallback callback =
+ frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
+
+ @Override
+ public void frameCompleted() throws HyracksDataException {
+ callback.frameCompleted();
+ appender.write(writer, true);
+ }
+ };
+
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
@@ -212,59 +277,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
- LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
- int tupleCount = accessor.getTupleCount();
- int i = 0;
- lsmAccessor.enter();
- try {
- while (i < tupleCount) {
- tb.reset();
- boolean recordWasInserted = false;
- boolean recordWasDeleted = false;
- tuple.reset(accessor, i);
- resetSearchPredicate(i);
- if (isFiltered || hasSecondaries) {
- lsmAccessor.search(cursor, searchPred);
- if (cursor.hasNext()) {
- cursor.next();
- prevTuple = cursor.getTuple();
- cursor.reset(); // end the search
- appendFilterToPrevTuple();
- appendPrevRecord();
- appendPreviousMeta();
- appendFilterToOutput();
- } else {
- appendPreviousTupleAsMissing();
- }
- } else {
- searchCallback.before(key); // lock
- appendPreviousTupleAsMissing();
- }
- if (isDeleteOperation(tuple, numOfPrimaryKeys)) {
- // Only delete if it is a delete and not upsert
- abstractModCallback.setOp(Operation.DELETE);
- lsmAccessor.forceDelete(tuple);
- recordWasDeleted = true;
- } else {
- abstractModCallback.setOp(Operation.UPSERT);
- lsmAccessor.forceUpsert(tuple);
- recordWasInserted = true;
- }
- if (isFiltered && prevTuple != null) {
- // need to update the filter of the new component with the previous value
- lsmAccessor.updateFilter(prevTuple);
- }
- writeOutput(i, recordWasInserted, recordWasDeleted);
- i++;
- }
- // callback here before calling nextFrame on the next operator
- frameOpCallback.frameCompleted();
- appender.write(writer, true);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- } finally {
- lsmAccessor.exit();
- }
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
}
private void appendFilterToOutput() throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
new file mode 100644
index 0000000..3fbe6cd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public interface IFrameTupleProcessor {
+
+ /**
+ * Called once per batch before starting the batch process
+ */
+ void start() throws HyracksDataException;
+
+ /**
+ * process the frame tuple with the frame index
+ *
+ * @param tuple
+ * the tuple
+ * @param index
+ * the index of the tuple in the frame
+ * @throws HyracksDataException
+ */
+ void process(ITupleReference tuple, int index) throws HyracksDataException;
+
+ /**
+ * Called once per batch before ending the batch process
+ */
+ void finish() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index f9a4e80..0fe7e04 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -22,6 +22,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -197,26 +199,28 @@
throws HyracksDataException;
/**
- * Enter components for the operation
- *
- * @param ctx
- * @throws HyracksDataException
- */
- void enter(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
- /**
- * Exits components for the operation
- *
- * @param ctx
- * @throws HyracksDataException
- */
- void exit(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
- /**
* Update the filter with the value in the passed tuple
*
* @param ctx
* @throws HyracksDataException
*/
void updateFilter(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * Perform batch operation on all tuples in the passed frame tuple accessor
+ *
+ * @param ctx
+ * the operation ctx
+ * @param accessor
+ * the frame tuple accessor
+ * @param tuple
+ * the mutable tuple used to pass the tuple to the processor
+ * @param processor
+ * the tuple processor
+ * @param frameOpCallback
+ * the callback at the end of the frame
+ * @throws HyracksDataException
+ */
+ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index f846c80..ad53e73 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -35,16 +35,6 @@
public interface ILSMIndexAccessor extends IIndexAccessor {
/**
- * Enter the memory component for modification
- */
- void enter() throws HyracksDataException;
-
- /**
- * Exit the memory component
- */
- void exit() throws HyracksDataException;
-
- /**
* Schedule a flush operation
*
* @param callback
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e315858..1502706 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -29,9 +29,13 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
@@ -586,16 +590,14 @@
lsmIndex.updateFilter(ctx, tuple);
}
- @Override
- public void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
lsmIndex.allocateMemoryComponents();
}
getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
}
- @Override
- public void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
}
@@ -608,4 +610,26 @@
exitAndComplete(ctx, op);
}
}
+
+ @Override
+ public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+ processor.start();
+ enter(ctx);
+ try {
+ int tupleCount = accessor.getTupleCount();
+ int i = 0;
+ while (i < tupleCount) {
+ tuple.reset(accessor, i);
+ processor.process(tuple, i);
+ i++;
+ }
+ frameOpCallback.frameCompleted();
+ processor.finish();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ exit(ctx);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index a41d2c0..20d71e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -23,15 +23,19 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -200,20 +204,13 @@
return cursorFactory.create(ctx);
}
- @Override
- public void enter() throws HyracksDataException {
- ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.enter(ctx);
- }
-
- @Override
- public void exit() throws HyracksDataException {
- ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.exit(ctx);
- }
-
public void updateFilter(ITupleReference tuple) throws HyracksDataException {
ctx.setOperation(IndexOperation.UPSERT);
lsmHarness.updateFilter(ctx, tuple);
}
+
+ public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
+ IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 2482103..fb5cc9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -202,23 +202,4 @@
public void forceUpsert(ITupleReference tuple) throws HyracksDataException {
throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
}
-
- /**
- * enter the memory component for modification
- */
- @Override
- public void enter() throws HyracksDataException {
- ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.enter(ctx);
- }
-
- /**
- * exit the memory component
- */
- @Override
- public void exit() throws HyracksDataException {
- ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.exit(ctx);
- }
-
}