Let SPLIT operator work as expected
- Let SPLIT operator function as expected in the optimization framework
by referencing the information for the REPLICATE operator
Change-Id: I999288ea4cf286e34d735a840843bf161876d3e3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1542
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
index f883687..852c392 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -100,4 +100,13 @@
return createPropagatingAllInputsTypeEnvironment(ctx);
}
+ public boolean isBlocker() {
+ for (boolean requiresMaterialization : outputMaterializationFlags) {
+ if (requiresMaterialization) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 2d2fd0f..0499327 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -42,12 +42,4 @@
return visitor.visitReplicateOperator(this, arg);
}
- public boolean isBlocker() {
- for (boolean requiresMaterialization : outputMaterializationFlags) {
- if (requiresMaterialization) {
- return true;
- }
- }
- return false;
- }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
index a996673..7be761b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -32,10 +32,22 @@
// Expression that keeps the output branch information for each tuple
private final Mutable<ILogicalExpression> branchingExpression;
+ // Default branch when there is no value from the given branching expression. The default is 0.
+ private final int defaultBranch;
+ // When the following is set to true, defaultBranch will be ignored and incoming tuples will be
+ // propagated to all output branches. The default is false.
+ private final boolean propageToAllBranchAsDefault;
public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression) {
+ this(outputArity, branchingExpression, 0, false);
+ }
+
+ public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression, int defaultBranch,
+ boolean propageToAllBranchForMissingExprValue) {
super(outputArity);
this.branchingExpression = branchingExpression;
+ this.defaultBranch = defaultBranch;
+ this.propageToAllBranchAsDefault = propageToAllBranchForMissingExprValue;
}
@Override
@@ -52,6 +64,14 @@
return branchingExpression;
}
+ public int getDefaultBranch() {
+ return defaultBranch;
+ }
+
+ public boolean getPropageToAllBranchAsDefault() {
+ return propageToAllBranchAsDefault;
+ }
+
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
return visitor.transform(branchingExpression);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
index 3b8aaab..923e56a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -47,6 +47,8 @@
throws AlgebricksException {
SplitOperator sop = (SplitOperator) op;
int outputArity = sop.getOutputArity();
+ int defaultBranch = sop.getDefaultBranch();
+ boolean propageToAllBranchAsDefault = sop.getPropageToAllBranchAsDefault();
IOperatorDescriptorRegistry spec = builder.getJobSpec();
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
@@ -59,7 +61,7 @@
IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory();
SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity,
- brachingExprEvalFactory, intInsepctorFactory);
+ brachingExprEvalFactory, intInsepctorFactory, defaultBranch, propageToAllBranchAsDefault);
contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 1a61f2e..2960903 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -25,14 +25,13 @@
import java.util.Map.Entry;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
@@ -107,8 +106,9 @@
Mutable<ILogicalOperator> child = entry.getKey();
List<Mutable<ILogicalOperator>> parents = entry.getValue();
if (parents.size() > 1) {
- if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
- ReplicateOperator rop = (ReplicateOperator) child.getValue();
+ if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE
+ || child.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) {
+ AbstractReplicateOperator rop = (AbstractReplicateOperator) child.getValue();
if (rop.isBlocker()) {
// make the order of the graph edges consistent with the order of rop's outputs
List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index 60275dd..f51c9ea 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -146,9 +146,10 @@
}
}
- // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement
- // (the resulting new variables should be assigned live below a replicate).
- if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map,
+ // since we want to avoid incorrect expression replacement
+ // (the resulting new variables should be assigned live below a replicate/split).
+ if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE || op.getOperatorTag() == LogicalOperatorTag.SPLIT) {
exprEqClassMap.clear();
return modified;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 474cc73..5a4cacd 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -459,9 +459,10 @@
private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef,
MutableInt currentClusterId) {
- // only replicate operator has multiple outputs
+ // only replicate or split operator has multiple outputs
int outputIndex = 0;
- if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE
+ || opRef.getValue().getOperatorTag() == LogicalOperatorTag.SPLIT) {
ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index 2d57e8d..88c0ea9 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -87,6 +87,7 @@
|| op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE
|| op2.getOperatorTag() == LogicalOperatorTag.PROJECT
|| op2.getOperatorTag() == LogicalOperatorTag.REPLICATE
+ || op2.getOperatorTag() == LogicalOperatorTag.SPLIT
|| op2.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
return new Pair<Boolean, Boolean>(false, false);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
index aab6d12..29998c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
@@ -58,7 +58,7 @@
LogicalOperatorTag tag2 = op2.getOperatorTag();
if (tag2 == LogicalOperatorTag.INNERJOIN || tag2 == LogicalOperatorTag.LEFTOUTERJOIN
- || tag2 == LogicalOperatorTag.REPLICATE) {
+ || tag2 == LogicalOperatorTag.REPLICATE || tag2 == LogicalOperatorTag.SPLIT) {
return false;
} else { // not a join
boolean res = propagateSelectionRec(opRef, opRef2);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 6fdcfdf..7dc59f6 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -73,6 +73,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -236,6 +237,9 @@
op.setPhysicalOperator(new ReplicatePOperator());
break;
}
+ case SPLIT:
+ op.setPhysicalOperator(new SplitPOperator());
+ break;
case SCRIPT: {
op.setPhysicalOperator(new StringStreamingScriptPOperator());
break;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index be39208..508b1aa 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -49,14 +49,24 @@
public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private IScalarEvaluatorFactory brachingExprEvalFactory;
- private IBinaryIntegerInspectorFactory intInsepctorFactory;
+ private final IScalarEvaluatorFactory brachingExprEvalFactory;
+ private final IBinaryIntegerInspectorFactory intInsepctorFactory;
+ private final int defaultBranch;
+ private final boolean propageToAllBranchAsDefault;
public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) {
+ this(spec, rDesc, outputArity, brachingExprEvalFactory, intInsepctorFactory, 0, false);
+ }
+
+ public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+ IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory,
+ int defaultBranch, boolean propageToAllBranchAsDefault) {
super(spec, rDesc, outputArity);
this.brachingExprEvalFactory = brachingExprEvalFactory;
this.intInsepctorFactory = intInsepctorFactory;
+ this.defaultBranch = defaultBranch;
+ this.propageToAllBranchAsDefault = propageToAllBranchAsDefault;
}
@Override
@@ -91,8 +101,7 @@
final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs];
final FrameTupleReference tRef = new FrameTupleReference();;
final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx);
- final IScalarEvaluator eval;
- eval = brachingExprEvalFactory.createScalarEvaluator(ctx);
+ final IScalarEvaluator eval = brachingExprEvalFactory.createScalarEvaluator(ctx);
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
}
@@ -112,17 +121,40 @@
accessor.reset(bufferAccessor);
int tupleCount = accessor.getTupleCount();
// The output branch number that starts from 0.
- int outputBranch;
+ int outputBranch = defaultBranch;
+ boolean correctBranchValue;
for (int i = 0; i < tupleCount; i++) {
// Get the output branch number from the field in the given tuple.
tRef.reset(accessor, i);
eval.evaluate(tRef, p);
- outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(),
- p.getLength());
+ correctBranchValue = true;
+
+ try {
+ outputBranch =
+ intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+
+ if (outputBranch < 0 || outputBranch >= outputArity) {
+ correctBranchValue = false;
+ }
+ } catch (Exception e) {
+ correctBranchValue = false;
+ }
// Add this tuple to the correct output frame.
- FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i);
+ if (correctBranchValue) {
+ FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i);
+ } else {
+ // Need to propagate to the all branches?
+ if (!propageToAllBranchAsDefault) {
+ FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i);
+ } else {
+ for (int j = 0; j < outputArity; j++) {
+ FrameUtils.appendToWriter(writers[j], appenders[j], accessor, i);
+ }
+ }
+ }
+
}
}