Merge branch 'gerrit/neo' into 'gerrit/trinity'
Ext-ref: MB-64229
Change-Id: Ic3fd8f7abd64252ad2473b049a0d222747a43857
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
public void close() throws IOException {
// No Op
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index eadb614..5c87994 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -33,6 +35,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.HyracksThrowingAction;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -50,6 +53,7 @@
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
@@ -77,6 +81,7 @@
private boolean flushedPartialTuples;
private int currentTupleIdx;
private int lastFlushedTupleIdx;
+ private IBatchController batchController;
private final PermutingFrameTupleReference keyTuple;
@@ -116,6 +121,8 @@
protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
return new IFrameTupleProcessor() {
+ private HyracksThrowingAction exitAction;
+
@Override
public void process(ITupleReference tuple, int index) throws HyracksDataException {
if (index < currentTupleIdx) {
@@ -219,6 +226,7 @@
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -227,7 +235,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
writeBuffer.ensureFrameSize(buffer.capacity());
if (flushedPartialTuples) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3b8ee68..d2dc3cf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -62,6 +64,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -117,6 +120,7 @@
private final ITracer tracer;
private final long traceCategory;
private long lastRecordInTimeStamp = 0L;
+ private IBatchController batchController;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -289,6 +293,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ callback.beforeExit(success);
+ }
+
+ @Override
public void close() throws IOException {
callback.close();
}
@@ -304,6 +313,7 @@
}
};
frameOpCallback.open();
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -344,7 +354,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
@@ -484,4 +494,5 @@
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+ static final IBatchController INSTANCE = new StandardBatchController();
+
+ private StandardBatchController() {
+ }
+
+ @Override
+ public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException {
+ lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+ }
+
+ @Override
+ public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException {
+ lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
* @param writer the FrameWriter to write to and flush
* @throws HyracksDataException
*/
- public default void flush(IFrameWriter writer) throws HyracksDataException {
+ default void flush(IFrameWriter writer) throws HyracksDataException {
write(writer, true);
writer.flush();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+ void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index c50c6b5..abe62c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -287,6 +287,42 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (HyracksThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
/**
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
-
- /*
- * Always write and then flush to send out the message if exists
- */
- @Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
- write(writer, true);
- writer.flush();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
return sharedMap == null ? null : (T) sharedMap.get(key);
}
+
+ /**
+ * get a <T> object from the shared map of the task, or returns the default value
+ *
+ * @param key
+ * @param ctx
+ * @param defaultValue
+ * @return the value associated with the key casted as T
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+ Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+ return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+ String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+ void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException;
+
+ void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
public interface IFrameOperationCallback extends Closeable {
/**
* Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
- * the pipeline
+ * the pipeline. In the event this frame completion will also exit the component, this will be
+ * called prior to {@link #beforeExit(boolean)}.
*
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
/**
- * Called when the task has failed.
+ * Called just prior to exiting the component on batch completion: not all batches may result
+ * in a component exit, depending on the decision of the {@link IBatchController}.
+ *
+ * @throws HyracksDataException
+ */
+ void beforeExit(boolean success) throws HyracksDataException;
+
+ /**
+ * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+ * invocation has failed.
*
* @param th
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..68de45a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -39,7 +39,6 @@
* @param tuple
* the operation tuple
* @throws HyracksDataException
- * @throws IndexException
*/
void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
@@ -54,7 +53,6 @@
* the operation tuple
* @return
* @throws HyracksDataException
- * @throws IndexException
*/
boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException;
@@ -69,7 +67,6 @@
* @param pred
* the search predicate
* @throws HyracksDataException
- * @throws IndexException
*/
void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
@@ -104,9 +101,7 @@
* Schedule a merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -114,9 +109,7 @@
* Schedule full merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -125,7 +118,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void merge(ILSMIOOperation operation) throws HyracksDataException;
@@ -133,7 +125,6 @@
* Schedule a flush
*
* @param ctx
- * @param callback
* @throws HyracksDataException
*/
ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +134,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void flush(ILSMIOOperation operation) throws HyracksDataException;
@@ -153,7 +143,6 @@
* @param ioOperation
* the io operation that added the new component
* @throws HyracksDataException
- * @throws IndexException
*/
void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
@@ -225,20 +214,22 @@
/**
* Perform batch operation on all tuples in the passed frame tuple accessor
*
- * @param ctx
- * the operation ctx
- * @param accessor
- * the frame tuple accessor
- * @param tuple
- * the mutable tuple used to pass the tuple to the processor
- * @param processor
- * the tuple processor
- * @param frameOpCallback
- * the callback at the end of the frame
+ * @param ctx the operation ctx
+ * @param accessor the frame tuple accessor
+ * @param tuple the mutable tuple used to pass the tuple to the processor
+ * @param processor the tuple processor
+ * @param frameOpCallback the callback at the end of the frame
+ * @param batchController
* @throws HyracksDataException
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+ throws HyracksDataException;
+
+ void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+ void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
+ throws HyracksDataException;
/**
* Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 717bcce..8ad67f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -688,15 +689,27 @@
lsmIndex.updateFilter(ctx, tuple);
}
- private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ @Override
+ public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
lsmIndex.allocateMemoryComponents();
}
- getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+ getAndEnterComponents(ctx, op, false);
}
- private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
- getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+ @Override
+ public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+ LSMOperationType op) throws HyracksDataException {
+ try {
+ callback.beforeExit(success);
+ } catch (Throwable th) {
+ // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+ // failures in the callback API- we might need this eventually
+ callback.fail(th);
+ throw th;
+ } finally {
+ getAndExitComponentsAndComplete(ctx, op);
+ }
}
private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -711,12 +724,15 @@
@Override
public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+ throws HyracksDataException {
processor.start();
- enter(ctx);
+ batchController.batchEnter(ctx, this, frameOpCallback);
+ boolean success = false;
try {
try {
processFrame(accessor, tuple, processor);
+ success = true;
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -728,7 +744,7 @@
LOGGER.warn("Failed to process frame", e);
throw e;
} finally {
- exit(ctx);
+ batchController.batchExit(ctx, this, frameOpCallback, success);
ctx.logPerformanceCounters(accessor.getTupleCount());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..e688727 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -210,8 +211,8 @@
}
public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
- IFrameOperationCallback frameOpCallback) throws HyracksDataException {
- lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+ IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController);
}
@Override