Merge branch 'gerrit/trinity' into 'gerrit/morpheus'
Change-Id: I64ace35c05e7226bd9965537e05ac6cc8fad648d
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
index d2ac8e5..378c758 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
@@ -398,14 +398,16 @@
if (!pushableNestedSubplan) {
return false;
}
-
+ Set<ILogicalOperator> visited = new HashSet<>();
for (int i = 0; i < nspOp.getNestedPlans().size(); i++) {
Mutable<ILogicalOperator> nspAggRef = nspOp.getNestedPlans().get(i).getRoots().get(0);
AggregateOperator nspAgg = (AggregateOperator) nspAggRef.getValue();
Mutable<ILogicalOperator> nspAggChildRef = nspAgg.getInputs().get(0);
LogicalVariable listifyVar = findListifiedVariable(nspAgg, varFromNestedAgg);
if (listifyVar != null) {
- OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context);
+ OperatorManipulationUtil.substituteVarRec(aggInSubplanOp, unnestVar, listifyVar, true, context,
+ visited);
+ visited.clear();
nspAgg.getVariables().addAll(aggInSubplanOp.getVariables());
nspAgg.getExpressions().addAll(aggInSubplanOp.getExpressions());
for (LogicalVariable v : aggInSubplanOp.getVariables()) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
index 6d92b51..28166a9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushGroupByThroughProduct.java
@@ -120,12 +120,14 @@
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) opRefJoin.getValue();
gby.getDecorList().clear();
gby.getDecorList().addAll(decorToPush);
+ Set<ILogicalOperator> visited = new HashSet<>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorNotToPush) {
LogicalVariable v1 = p.first;
if (v1 != null) {
VariableReferenceExpression varRef = (VariableReferenceExpression) p.second.getValue();
LogicalVariable v2 = varRef.getVariableReference();
- OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context);
+ OperatorManipulationUtil.substituteVarRec(join, v2, v1, true, context, visited);
+ visited.clear();
}
}
Mutable<ILogicalOperator> branchRef = join.getInputs().get(branch);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b3..3ab86c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -47,6 +47,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ // No Op
+ }
+
+ @Override
public void close() throws IOException {
// No Op
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index eadb614..5c87994 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -33,6 +35,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.api.util.HyracksThrowingAction;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -50,6 +53,7 @@
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
@@ -77,6 +81,7 @@
private boolean flushedPartialTuples;
private int currentTupleIdx;
private int lastFlushedTupleIdx;
+ private IBatchController batchController;
private final PermutingFrameTupleReference keyTuple;
@@ -116,6 +121,8 @@
protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
return new IFrameTupleProcessor() {
+ private HyracksThrowingAction exitAction;
+
@Override
public void process(ITupleReference tuple, int index) throws HyracksDataException {
if (index < currentTupleIdx) {
@@ -219,6 +226,7 @@
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -227,7 +235,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
writeBuffer.ensureFrameSize(buffer.capacity());
if (flushedPartialTuples) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3b8ee68..d2dc3cf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -62,6 +64,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
@@ -117,6 +120,7 @@
private final ITracer tracer;
private final long traceCategory;
private long lastRecordInTimeStamp = 0L;
+ private IBatchController batchController;
public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@ -289,6 +293,11 @@
}
@Override
+ public void beforeExit(boolean success) throws HyracksDataException {
+ callback.beforeExit(success);
+ }
+
+ @Override
public void close() throws IOException {
callback.close();
}
@@ -304,6 +313,7 @@
}
};
frameOpCallback.open();
+ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE);
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
@@ -344,7 +354,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController);
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
@@ -484,4 +494,5 @@
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
new file mode 100644
index 0000000..40465c6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+class StandardBatchController implements IBatchController {
+ static final IBatchController INSTANCE = new StandardBatchController();
+
+ private StandardBatchController() {
+ }
+
+ @Override
+ public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException {
+ lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
+ }
+
+ @Override
+ public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException {
+ lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 2413ed2..f8fb6c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -181,17 +181,24 @@
}
public static void substituteVarRec(AbstractLogicalOperator op, LogicalVariable v1, LogicalVariable v2,
- boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+ boolean goThroughNts, ITypingContext ctx, Set<ILogicalOperator> visited) throws AlgebricksException {
VariableUtilities.substituteVariables(op, v1, v2, goThroughNts, ctx);
for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
- substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx);
+ if (visited.contains(opRef2.getValue())) {
+ continue;
+ }
+ visited.add(opRef2.getValue());
+ substituteVarRec((AbstractLogicalOperator) opRef2.getValue(), v1, v2, goThroughNts, ctx, visited);
}
if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE && goThroughNts) {
NestedTupleSourceOperator nts = (NestedTupleSourceOperator) op;
if (nts.getDataSourceReference() != null) {
AbstractLogicalOperator op2 =
(AbstractLogicalOperator) nts.getDataSourceReference().getValue().getInputs().get(0).getValue();
- substituteVarRec(op2, v1, v2, goThroughNts, ctx);
+ if (!visited.contains(op2)) {
+ visited.add(op2);
+ substituteVarRec(op2, v1, v2, goThroughNts, ctx, visited);
+ }
}
}
if (op.hasNestedPlans()) {
@@ -199,7 +206,11 @@
for (ILogicalPlan p : aonp.getNestedPlans()) {
for (Mutable<ILogicalOperator> ref : p.getRoots()) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
- substituteVarRec(aop, v1, v2, goThroughNts, ctx);
+ if (visited.contains(aop)) {
+ continue;
+ }
+ visited.add(aop);
+ substituteVarRec(aop, v1, v2, goThroughNts, ctx, visited);
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
index 55c8400..dbdf6c4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
@@ -92,6 +92,7 @@
protected void buildVarExprList(Collection<LogicalVariable> vars, IOptimizationContext context, GroupByOperator g,
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
SourceLocation sourceLoc = g.getSourceLocation();
+ Set<ILogicalOperator> visited = new HashSet<>();
for (LogicalVariable ov : vars) {
LogicalVariable newVar = context.newVar();
VariableReferenceExpression varExpr = new VariableReferenceExpression(newVar);
@@ -101,7 +102,8 @@
for (ILogicalPlan p : g.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
- context);
+ context, visited);
+ visited.clear();
}
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
index 391d03a..36cad77 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
@@ -331,6 +331,7 @@
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
SourceLocation sourceLoc = g.getSourceLocation();
Map<LogicalVariable, LogicalVariable> m = new HashMap<LogicalVariable, LogicalVariable>();
+ Set<ILogicalOperator> visited = new HashSet<>();
for (LogicalVariable ov : vars) {
LogicalVariable newVar = context.newVar();
ILogicalExpression varExpr = new VariableReferenceExpression(newVar);
@@ -340,11 +341,13 @@
for (ILogicalPlan p : g.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), ov, newVar, true,
- context);
+ context, visited);
+ visited.clear();
}
}
AbstractLogicalOperator opUnder = (AbstractLogicalOperator) g.getInputs().get(0).getValue();
- OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context);
+ OperatorManipulationUtil.substituteVarRec(opUnder, ov, newVar, true, context, visited);
+ visited.clear();
m.put(ov, newVar);
}
return m;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
index 0fc24c7..98215bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java
@@ -63,7 +63,7 @@
* @param writer the FrameWriter to write to and flush
* @throws HyracksDataException
*/
- public default void flush(IFrameWriter writer) throws HyracksDataException {
+ default void flush(IFrameWriter writer) throws HyracksDataException {
write(writer, true);
writer.flush();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
new file mode 100644
index 0000000..7e3d599
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface HyracksThrowingAction {
+ void run() throws HyracksDataException; // NOSONAR
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index c50c6b5..abe62c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -287,6 +287,42 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (HyracksThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
/**
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index c27a7e6..6c073f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -172,13 +172,4 @@
++tupleCount;
IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
}
-
- /*
- * Always write and then flush to send out the message if exists
- */
- @Override
- public void flush(IFrameWriter writer) throws HyracksDataException {
- write(writer, true);
- writer.flush();
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 75c95b0..b77883d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -74,4 +74,18 @@
Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
return sharedMap == null ? null : (T) sharedMap.get(key);
}
+
+ /**
+ * get a <T> object from the shared map of the task, or returns the default value
+ *
+ * @param key
+ * @param ctx
+ * @param defaultValue
+ * @return the value associated with the key casted as T
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) {
+ Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false);
+ return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
new file mode 100644
index 0000000..e061589
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IBatchController {
+ String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
+
+ void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback)
+ throws HyracksDataException;
+
+ void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback,
+ boolean batchSuccessful) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2..2fbc0c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@
public interface IFrameOperationCallback extends Closeable {
/**
* Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
- * the pipeline
+ * the pipeline. In the event this frame completion will also exit the component, this will be
+ * called prior to {@link #beforeExit(boolean)}.
*
* @throws HyracksDataException
*/
void frameCompleted() throws HyracksDataException;
/**
- * Called when the task has failed.
+ * Called just prior to exiting the component on batch completion: not all batches may result
+ * in a component exit, depending on the decision of the {@link IBatchController}.
+ *
+ * @throws HyracksDataException
+ */
+ void beforeExit(boolean success) throws HyracksDataException;
+
+ /**
+ * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)}
+ * invocation has failed.
*
* @param th
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..68de45a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -39,7 +39,6 @@
* @param tuple
* the operation tuple
* @throws HyracksDataException
- * @throws IndexException
*/
void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
@@ -54,7 +53,6 @@
* the operation tuple
* @return
* @throws HyracksDataException
- * @throws IndexException
*/
boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException;
@@ -69,7 +67,6 @@
* @param pred
* the search predicate
* @throws HyracksDataException
- * @throws IndexException
*/
void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
@@ -104,9 +101,7 @@
* Schedule a merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -114,9 +109,7 @@
* Schedule full merge
*
* @param ctx
- * @param callback
* @throws HyracksDataException
- * @throws IndexException
*/
ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -125,7 +118,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void merge(ILSMIOOperation operation) throws HyracksDataException;
@@ -133,7 +125,6 @@
* Schedule a flush
*
* @param ctx
- * @param callback
* @throws HyracksDataException
*/
ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +134,6 @@
*
* @param operation
* @throws HyracksDataException
- * @throws IndexException
*/
void flush(ILSMIOOperation operation) throws HyracksDataException;
@@ -153,7 +143,6 @@
* @param ioOperation
* the io operation that added the new component
* @throws HyracksDataException
- * @throws IndexException
*/
void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
@@ -225,20 +214,22 @@
/**
* Perform batch operation on all tuples in the passed frame tuple accessor
*
- * @param ctx
- * the operation ctx
- * @param accessor
- * the frame tuple accessor
- * @param tuple
- * the mutable tuple used to pass the tuple to the processor
- * @param processor
- * the tuple processor
- * @param frameOpCallback
- * the callback at the end of the frame
+ * @param ctx the operation ctx
+ * @param accessor the frame tuple accessor
+ * @param tuple the mutable tuple used to pass the tuple to the processor
+ * @param processor the tuple processor
+ * @param frameOpCallback the callback at the end of the frame
+ * @param batchController
* @throws HyracksDataException
*/
void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+ throws HyracksDataException;
+
+ void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException;
+
+ void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op)
+ throws HyracksDataException;
/**
* Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 717bcce..8ad67f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -688,15 +689,27 @@
lsmIndex.updateFilter(ctx, tuple);
}
- private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ @Override
+ public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
if (!lsmIndex.isMemoryComponentsAllocated()) {
lsmIndex.allocateMemoryComponents();
}
- getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+ getAndEnterComponents(ctx, op, false);
}
- private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException {
- getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION);
+ @Override
+ public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success,
+ LSMOperationType op) throws HyracksDataException {
+ try {
+ callback.beforeExit(success);
+ } catch (Throwable th) {
+ // TODO(mblow): we don't distinguish between the three distinct phases we can encounter
+ // failures in the callback API- we might need this eventually
+ callback.fail(th);
+ throw th;
+ } finally {
+ getAndExitComponentsAndComplete(ctx, op);
+ }
}
private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op)
@@ -711,12 +724,15 @@
@Override
public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
- IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController)
+ throws HyracksDataException {
processor.start();
- enter(ctx);
+ batchController.batchEnter(ctx, this, frameOpCallback);
+ boolean success = false;
try {
try {
processFrame(accessor, tuple, processor);
+ success = true;
frameOpCallback.frameCompleted();
} catch (Throwable th) {
processor.fail(th);
@@ -728,7 +744,7 @@
LOGGER.warn("Failed to process frame", e);
throw e;
} finally {
- exit(ctx);
+ batchController.batchExit(ctx, this, frameOpCallback, success);
ctx.logPerformanceCounters(accessor.getTupleCount());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..e688727 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -210,8 +211,8 @@
}
public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
- IFrameOperationCallback frameOpCallback) throws HyracksDataException {
- lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+ IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException {
+ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController);
}
@Override