Merge "Merge branch 'gerrit/trinity' into 'gerrit/goldfish'" into goldfish
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
public void close() throws IOException {
// No Op
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 072a9b6..9b8b103 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -58,6 +60,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -95,12 +98,13 @@
private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
private final IFrameOperationCallback[] frameOpCallbacks;
- private boolean flushedPartialTuples;
private final PermutingFrameTupleReference keyTuple;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final IntSet processedTuples = new IntOpenHashSet();
private final IntSet flushedTuples = new IntOpenHashSet();
private final SourceLocation sourceLoc;
+ private boolean flushedPartialTuples;
+ private IBatchController batchController;
public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
@@ -284,6 +288,7 @@
searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
frameTuple = new FrameTupleReference();
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -305,7 +310,8 @@
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
IFrameTupleProcessor processor = processors[pIdx];
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+ p2tuplesMapEntry.getValue());
}
writeBuffer.ensureFrameSize(buffer.capacity());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8bc2b1c..dd9f4070d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -71,6 +73,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -140,11 +143,12 @@
private final ITracer tracer;
private final long traceCategory;
private final ITupleProjector tupleProjector;
- private long lastRecordInTimeStamp = 0L;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final boolean hasSecondaries;
private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
private final int[] fieldPermutation;
+ private long lastRecordInTimeStamp = 0L;
+ private IBatchController batchController;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -362,6 +366,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ callback.beforeExit(success);
+ }
+
+ @Override
public void close() throws IOException {
callback.close();
}
@@ -378,6 +387,7 @@
};
frameOpCallbacks[i].open();
}
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
}
protected void resetSearchPredicate(int tupleIndex) {
@@ -437,7 +447,8 @@
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
IFrameTupleProcessor processor = processors[pIdx];
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController,
+ p2tuplesMapEntry.getValue());
}
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
@@ -624,4 +635,5 @@
}
}
}
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+ static final IBatchController INSTANCE = new StandardBatchController();
+
+ private StandardBatchController() {
+ }
+
+ @Override
+ public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException {
+ lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+ }
+
+ @Override
+ public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException {
+ lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
* @param writer the FrameWriter to write to and flush
* @throws HyracksDataException
*/
- public default void flush(IFrameWriter writer) throws HyracksDataException {
+ default void flush(IFrameWriter writer) throws HyracksDataException {
write(writer, true);
writer.flush();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+ void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 3ed17b1..ad5e919 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -288,6 +288,42 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (HyracksThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) {
Throwable savedT = null;
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
-
- /*
- * Always write and then flush to send out the message if exists
- */
- @Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
- write(writer, true);
- writer.flush();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
return sharedMap == null ? null : (T) sharedMap.get(key);
}
+
+ /**
+ * get a <T> object from the shared map of the task, or returns the default value
+ *
+ * @param key
+ * @param ctx
+ * @param defaultValue
+ * @return the value associated with the key casted as T
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+ Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+ return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+ String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+ void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException;
+
+ void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
public interface IFrameOperationCallback extends Closeable {
/**
* Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
- * the pipeline
+ * the pipeline. In the event this frame completion will also exit the component, this will be
+ * called prior to {@link #beforeExit(boolean)}.
*
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
/**
- * Called when the task has failed.
+ * Called just prior to exiting the component on batch completion: not all batches may result
+ * in a component exit, depending on the decision of the {@link IBatchController}.
+ *
+ * @throws HyracksDataException
+ */
+ void beforeExit(boolean success) throws HyracksDataException;
+
+ /**
+ * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+ * invocation has failed.
*
* @param th
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 214d9dc..dbf34c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -227,10 +227,17 @@
* the callback at the end of the frame
* @param tuples
* the indexes of tuples to process
+ * @param batchController
+ * the controller of the batch lifecycle
* @throws HyracksDataException
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+ Set<Integer> tuples) throws HyracksDataException;
+
+ void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+ void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2d04020..d019a08 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -692,15 +693,27 @@
lsmIndex.updateFilter(ctx, tuple);
}
- private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ @Override
+ public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
lsmIndex.allocateMemoryComponents();
}
- getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+ getAndEnterComponents(ctx, op, false);
}
- private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
- getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+ @Override
+ public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+ LSMOperationType op) throws HyracksDataException {
+ try {
+ callback.beforeExit(success);
+ } catch (Throwable th) {
+ // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+ // failures in the callback API- we might need this eventually
+ callback.fail(th);
+ throw th;
+ } finally {
+ getAndExitComponentsAndComplete(ctx, op);
+ }
}
private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -715,13 +728,15 @@
@Override
public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
- throws HyracksDataException {
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController,
+ Set<Integer> tuples) throws HyracksDataException {
processor.start();
- enter(ctx);
+ batchController.batchEnter(ctx, this, frameOpCallback);
+ boolean success = false;
try {
try {
processFrame(accessor, tuple, processor, tuples);
+ success = true;
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -733,7 +748,7 @@
LOGGER.warn("Failed to process frame", e);
throw e;
} finally {
- exit(ctx);
+ batchController.batchExit(ctx, this, frameOpCallback, success);
ctx.logPerformanceCounters(accessor.getTupleCount());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fb5984d..c768768 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -211,8 +212,9 @@
}
public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
- IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
- lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
+ IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples)
+ throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples);
}
@Override