Index-only plan step 2: Added SplitOperator
- Introduced SplitOperator that sends a tuple to only one output frame unlike the ReplicateOperator
that propagates a tuple into all outputs frames.
- Removed PartitioningSplitOperator and PartitioningSplitOperatorDescriptor that are not functional
(lacking physical operator)
- Added a unit test case of SplitOperatorDescriptor in PushRuntimeTest.
Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1196
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index e04be6f..4a79387 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -44,19 +44,19 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -197,15 +197,12 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
- for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
- sweepExpression(expr.getValue(), op);
- }
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return null;
}
@Override
- public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 143cf0b..874cc7c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -26,8 +26,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.om.base.AString;
@@ -67,18 +67,18 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -508,13 +508,12 @@
}
@Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
- throws AlgebricksException {
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
- public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index c7b927e..eeb2c2a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -53,12 +53,12 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -276,13 +276,12 @@
}
@Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
- throws AlgebricksException {
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
@Override
- public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return visitSingleInputOperator(op);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ef0f9da..ccf0aeb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -33,17 +33,17 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -153,12 +153,12 @@
}
@Override
- public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
- return false;
+ public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
}
@Override
- public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public Boolean visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return visitInputs(op);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index d39a8c8..cc7a75f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -38,13 +38,13 @@
MATERIALIZE,
NESTEDTUPLESOURCE,
ORDER,
- PARTITIONINGSPLIT,
PROJECT,
REPLICATE,
RUNNINGAGGREGATE,
SCRIPT,
SELECT,
SINK,
+ SPLIT,
SUBPLAN,
TOKENIZE,
UNIONALL,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 893fb32..1d20e08 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -46,13 +46,13 @@
NESTED_LOOP,
NESTED_TUPLE_SOURCE,
ONE_TO_ONE_EXCHANGE,
- PARTITIONINGSPLIT,
PRE_CLUSTERED_GROUP_BY,
PRE_SORTED_DISTINCT_BY,
RANDOM_PARTITION_EXCHANGE,
RANDOM_MERGE_EXCHANGE,
RANGE_PARTITION_EXCHANGE,
RANGE_PARTITION_MERGE_EXCHANGE,
+ REPLICATE,
RTREE_SEARCH,
RUNNING_AGGREGATE,
SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
@@ -60,7 +60,7 @@
SINK_WRITE,
SORT_GROUP_BY,
SORT_MERGE_EXCHANGE,
- REPLICATE,
+ SPLIT,
STABLE_SORT,
STATS,
STREAM_LIMIT,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
new file mode 100644
index 0000000..f883687
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Abstract class for two replication related operator - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperator extends AbstractLogicalOperator {
+
+ private int outputArity;
+ protected boolean[] outputMaterializationFlags;
+ private List<Mutable<ILogicalOperator>> outputs;
+
+ public AbstractReplicateOperator(int outputArity) {
+ this.outputArity = outputArity;
+ this.outputMaterializationFlags = new boolean[outputArity];
+ this.outputs = new ArrayList<>();
+ }
+
+ public AbstractReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
+ this.outputArity = outputArity;
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ this.outputs = new ArrayList<>();
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return VariablePropagationPolicy.ALL;
+ }
+
+ @Override
+ public boolean isMap() {
+ return true;
+ }
+
+ @Override
+ public void recomputeSchema() {
+ schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ }
+
+ public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+ // do nothing
+ }
+
+ public int getOutputArity() {
+ return outputArity;
+ }
+
+ public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ }
+
+ public boolean[] getOutputMaterializationFlags() {
+ return outputMaterializationFlags;
+ }
+
+ public List<Mutable<ILogicalOperator>> getOutputs() {
+ return outputs;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java
deleted file mode 100644
index 1c5f324..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.algebricks.runtime.operators.std.PartitioningSplitOperatorDescriptor;
-
-/**
- * Partitions it's input based on a given list of expressions.
- * Each expression is assumed to return true/false,
- * and there is exactly one output branch per expression (optionally, plus one default branch).
- * For each input tuple, the expressions are evaluated one-by-one,
- * and the tuple is written to first output branch whose corresponding
- * expression evaluates to true.
- * If all expressions evaluate to false, then
- * the tuple is written to the default output branch, if any was given.
- * If no output branch was given, then such tuples are simply dropped.
- * Given N expressions there may be N or N+1 output branches because the default output branch may be separate from the regular output branches.
- */
-public class PartitioningSplitOperator extends AbstractLogicalOperator {
-
- private final List<Mutable<ILogicalExpression>> expressions;
- private final int defaultBranchIndex;
-
- public PartitioningSplitOperator(List<Mutable<ILogicalExpression>> expressions, int defaultBranchIndex) throws AlgebricksException {
- this.expressions = expressions;
- this.defaultBranchIndex = defaultBranchIndex;
- // Check that the default output branch index is in [0, N], where N is the number of expressions.
- if (defaultBranchIndex != PartitioningSplitOperatorDescriptor.NO_DEFAULT_BRANCH
- && defaultBranchIndex > expressions.size()) {
- throw new AlgebricksException("Default branch index out of bounds. Number of exprs given: "
- + expressions.size() + ". The maximum default branch index may therefore be: " + expressions.size());
- }
- }
-
- public List<Mutable<ILogicalExpression>> getExpressions() {
- return expressions;
- }
-
- public int getDefaultBranchIndex() {
- return defaultBranchIndex;
- }
-
- public int getNumOutputBranches() {
- return (defaultBranchIndex == expressions.size()) ? expressions.size() + 1 : expressions.size();
- }
-
- @Override
- public LogicalOperatorTag getOperatorTag() {
- return LogicalOperatorTag.PARTITIONINGSPLIT;
- }
-
- @Override
- public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- boolean b = false;
- for (int i = 0; i < expressions.size(); i++) {
- if (visitor.transform(expressions.get(i))) {
- b = true;
- }
- }
- return b;
- }
-
- @Override
- public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
- return visitor.visitPartitioningSplitOperator(this, arg);
- }
-
- @Override
- public boolean isMap() {
- return false;
- }
-
- @Override
- public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 834107c..2d2fd0f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -18,36 +18,18 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class ReplicateOperator extends AbstractLogicalOperator {
-
- private int outputArity;
- private boolean[] outputMaterializationFlags;
- private List<Mutable<ILogicalOperator>> outputs;
+public class ReplicateOperator extends AbstractReplicateOperator {
public ReplicateOperator(int outputArity) {
- this.outputArity = outputArity;
- this.outputMaterializationFlags = new boolean[outputArity];
- this.outputs = new ArrayList<>();
+ super(outputArity);
}
public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
- this.outputArity = outputArity;
- this.outputMaterializationFlags = outputMaterializationFlags;
- this.outputs = new ArrayList<>();
+ super(outputArity, outputMaterializationFlags);
}
@Override
@@ -60,52 +42,6 @@
return visitor.visitReplicateOperator(this, arg);
}
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
- throws AlgebricksException {
- return false;
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public boolean isMap() {
- return true;
- }
-
- @Override
- public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
- }
-
- public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
- // do nothing
- }
-
- public int getOutputArity() {
- return outputArity;
- }
-
- public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
- this.outputMaterializationFlags = outputMaterializationFlags;
- }
-
- public boolean[] getOutputMaterializationFlags() {
- return outputMaterializationFlags;
- }
-
- public List<Mutable<ILogicalOperator>> getOutputs() {
- return outputs;
- }
-
- @Override
- public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
public boolean isBlocker() {
for (boolean requiresMaterialization : outputMaterializationFlags) {
if (requiresMaterialization) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
new file mode 100644
index 0000000..a996673
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Split Operator receives an expression. This expression will be evaluated to an integer value during the runtime.
+ * Based on its value, it propagates each tuple to the corresponding output frame. (e.g., first output = 0, ...)
+ * Thus, unlike Replicate operator that does unconditional propagation to all outputs,
+ * this does a conditional propagate operation.
+ */
+public class SplitOperator extends AbstractReplicateOperator {
+
+ // Expression that keeps the output branch information for each tuple
+ private final Mutable<ILogicalExpression> branchingExpression;
+
+ public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression) {
+ super(outputArity);
+ this.branchingExpression = branchingExpression;
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.SPLIT;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitSplitOperator(this, arg);
+ }
+
+ public Mutable<ILogicalExpression> getBranchingExpression() {
+ return branchingExpression;
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ return visitor.transform(branchingExpression);
+ }
+
+ @Override
+ public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+ getBranchingExpression().getValue().substituteVar(v1, v2);
+ }
+
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index b999493..d278078 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -48,13 +48,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -175,12 +175,12 @@
}
@Override
- public Long visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
+ public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return op.getInputs().get(0).getValue().accept(this, arg);
}
@Override
- public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public Long visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return op.getInputs().get(0).getValue().accept(this, arg);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 5e2a041..b259869 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -30,7 +30,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -67,13 +66,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -405,12 +404,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
- throws AlgebricksException {
- throw new NotImplementedException();
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, IOptimizationContext ctx) throws AlgebricksException {
propagateFDsAndEquivClassesForUsedVars(op, ctx, op.getVariables());
return null;
@@ -423,6 +416,12 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
propagateFDsAndEquivClasses(op, ctx);
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c0e8b34..7f34e8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -55,13 +55,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -277,18 +277,6 @@
}
@Override
- public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
- throws AlgebricksException {
- AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
- if (aop.getOperatorTag() != LogicalOperatorTag.PARTITIONINGSPLIT) {
- return Boolean.FALSE;
- }
- PartitioningSplitOperator partitionOpArg = (PartitioningSplitOperator) copyAndSubstituteVar(op, arg);
- boolean isomorphic = compareExpressions(op.getExpressions(), partitionOpArg.getExpressions());
- return isomorphic;
- }
-
- @Override
public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.REPLICATE) {
@@ -298,6 +286,17 @@
}
@Override
+ public Boolean visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.SPLIT) {
+ return Boolean.FALSE;
+ }
+ SplitOperator sOpArg = (SplitOperator) copyAndSubstituteVar(op, arg);
+ boolean isomorphic = op.getBranchingExpression().getValue().equals(sOpArg.getBranchingExpression().getValue());
+ return isomorphic;
+ }
+
+ @Override
public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 965008c..58b31f8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -22,8 +22,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -57,13 +57,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -165,14 +165,13 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
- throws AlgebricksException {
+ public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
}
@Override
- public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
+ public Void visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 4241d84..f4b3195 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -52,18 +52,18 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -384,14 +384,6 @@
}
@Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
- throws AlgebricksException {
- PartitioningSplitOperator opCopy = new PartitioningSplitOperator(
- exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()), op.getDefaultBranchIndex());
- return opCopy;
- }
-
- @Override
public ILogicalOperator visitProjectOperator(ProjectOperator op, ILogicalOperator arg) throws AlgebricksException {
ProjectOperator opCopy = new ProjectOperator(deepCopyVariableList(op.getVariables()));
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
@@ -410,6 +402,13 @@
}
@Override
+ public ILogicalOperator visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
+ SplitOperator opCopy = new SplitOperator(op.getOutputArity(), op.getBranchingExpression());
+ deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+ return opCopy;
+ }
+
+ @Override
public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg)
throws AlgebricksException {
MaterializeOperator opCopy = new MaterializeOperator();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8d3644d..7e92869 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -47,13 +47,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext arg)
- throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, IOptimizationContext context) throws AlgebricksException {
propagateCardinalityAndFrameNumber(op, context);
return null;
@@ -193,6 +186,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fde6a28..442899f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -52,13 +52,14 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -66,7 +67,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -167,16 +167,13 @@
}
@Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
- throws AlgebricksException {
- ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
- deepCopyExpressionRefs(newExpressions, op.getExpressions());
- return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return new ReplicateOperator(op.getOutputArity());
}
@Override
- public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
- return new ReplicateOperator(op.getOutputArity());
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ return new SplitOperator(op.getOutputArity(), op.getBranchingExpression());
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f01b20f..9f1acea 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -49,13 +49,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,12 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
- throws AlgebricksException {
+ public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
return null;
}
@Override
- public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException {
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 10659b1..3645aff 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -48,18 +48,18 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -156,11 +156,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
- return null;
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
return null;
}
@@ -255,6 +250,11 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index d35153a..a746cf2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -46,18 +46,18 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -179,12 +179,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
- return null;
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
schemaVariables.addAll(op.getVariables());
return null;
@@ -288,6 +282,12 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index f2da7c4..7345928 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -46,19 +46,19 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -219,16 +219,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op,
- Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
- for (Mutable<ILogicalExpression> e : op.getExpressions()) {
- e.getValue().substituteVar(pair.first, pair.second);
- }
- substVarTypes(op, pair);
- return null;
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
List<LogicalVariable> usedVariables = op.getVariables();
@@ -414,6 +404,13 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ op.substituteVar(arg.first, arg.second);
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
throws AlgebricksException {
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0d50dc2..cfe2a37 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -52,13 +52,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -232,14 +232,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) {
- for (Mutable<ILogicalExpression> e : op.getExpressions()) {
- e.getValue().getUsedVariables(usedVariables);
- }
- return null;
- }
-
- @Override
public Void visitProjectOperator(ProjectOperator op, Void arg) {
List<LogicalVariable> parameterVariables = op.getVariables();
for (LogicalVariable v : parameterVariables) {
@@ -443,6 +435,14 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) {
+ VariableUtilities.getUsedVariables(outputOp.getValue(), usedVariables);
+ }
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
new file mode 100644
index 0000000..1d13163
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractReplicatePOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ AbstractReplicateOperator rop = (AbstractReplicateOperator) op;
+ int[] outputDependencyLabels = new int[rop.getOutputArity()];
+ // change the labels of outputs that requires materialization to 1
+ boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+ for (int i = 0; i < rop.getOutputArity(); i++) {
+ if (outputMaterializationFlags[i]) {
+ outputDependencyLabels[i] = 1;
+ }
+ }
+ return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 6501aeb..74739da 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -19,24 +19,19 @@
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
-public class ReplicatePOperator extends AbstractPhysicalOperator {
+public class ReplicatePOperator extends AbstractReplicatePOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -44,55 +39,25 @@
}
@Override
- public boolean isMicroOperator() {
- return false;
- }
-
- @Override
- public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
- return emptyUnaryRequirements();
- }
-
- @Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
- }
-
- @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
IOperatorDescriptorRegistry spec = builder.getJobSpec();
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
ReplicateOperator rop = (ReplicateOperator) op;
int outputArity = rop.getOutputArity();
boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
- SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
+ ReplicateOperatorDescriptor splitOpDesc = new ReplicateOperatorDescriptor(spec, recDescriptor, outputArity,
+ outputMaterializationFlags);
contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
@Override
- public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
- int[] inputDependencyLabels = new int[] { 0 };
- ReplicateOperator rop = (ReplicateOperator) op;
- int[] outputDependencyLabels = new int[rop.getOutputArity()];
- // change the labels of outputs that requires materialization to 1
- boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
- for (int i = 0; i < rop.getOutputArity(); i++) {
- if (outputMaterializationFlags[i]) {
- outputDependencyLabels[i] = 1;
- }
- }
- return new Pair<>(inputDependencyLabels, outputDependencyLabels);
- }
-
- @Override
public boolean expensiveThanMaterialization() {
return false;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
new file mode 100644
index 0000000..3b8aaab
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+public class SplitPOperator extends AbstractReplicatePOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.SPLIT;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ SplitOperator sop = (SplitOperator) op;
+ int outputArity = sop.getOutputArity();
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IScalarEvaluatorFactory brachingExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(
+ sop.getBranchingExpression().getValue(), context.getTypeEnvironment(op), inputSchemas, context);
+
+ IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory();
+
+ SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity,
+ brachingExprEvalFactory, intInsepctorFactory);
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7e83880..d3dd166 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
@@ -49,13 +50,13 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -63,7 +64,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -236,15 +236,6 @@
}
@Override
- public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
- throws AlgebricksException {
- addIndent(indent).append("partitioning-split (");
- pprintExprList(op.getExpressions(), indent);
- buffer.append(")");
- return null;
- }
-
- @Override
public Void visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("subplan {");
printNestedPlans(op, indent);
@@ -363,6 +354,13 @@
}
@Override
+ public Void visitSplitOperator(SplitOperator op, Integer indent) throws AlgebricksException {
+ Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression();
+ addIndent(indent).append("split " + branchingExpression.getValue().accept(exprVisitor, indent));
+ return null;
+ }
+
+ @Override
public Void visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("materialize");
return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c0f9718..f5ff8b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -34,18 +34,18 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -82,10 +82,10 @@
public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
- public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException;
-
public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException;
+ public R visitSplitOperator(SplitOperator op, T arg) throws AlgebricksException;
+
public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException;
public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 39cac06..c5d7291 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -48,12 +48,12 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,12 @@
}
@Override
- public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
- throws AlgebricksException {
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
return visit(op);
}
@Override
- public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
return visit(op);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
deleted file mode 100644
index 2d5c929..0000000
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.operators.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-
-public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
- public static int NO_DEFAULT_BRANCH = -1;
-
- private final IScalarEvaluatorFactory[] evalFactories;
- private final IBinaryBooleanInspector boolInspector;
- private final int defaultBranchIndex;
-
- public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex,
- RecordDescriptor rDesc) {
- super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
- for (int i = 0; i < evalFactories.length; i++) {
- recordDescriptors[i] = rDesc;
- }
- this.evalFactories = evalFactories;
- this.boolInspector = boolInspector;
- this.defaultBranchIndex = defaultBranchIndex;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryInputOperatorNodePushable() {
- private final IFrameWriter[] writers = new IFrameWriter[outputArity];
- private final boolean[] isOpen = new boolean[outputArity];
- private final IFrame[] writeBuffers = new IFrame[outputArity];
- private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
- private final IPointable evalPointable = new VoidPointable();
- private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
- 0);
- private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
- private final FrameTupleReference frameTuple = new FrameTupleReference();
-
- private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
- private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
- private final DataOutput tupleDos = tupleBuilder.getDataOutput();
-
- @Override
- public void close() throws HyracksDataException {
- HyracksDataException hde = null;
- for (int i = 0; i < outputArity; i++) {
- if (isOpen[i]) {
- try {
- tupleAppender.reset(writeBuffers[i], false);
- tupleAppender.write(writers[i], false);
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException();
- }
- hde.addSuppressed(th);
- } finally {
- try {
- writers[i].close();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException();
- }
- hde.addSuppressed(th);
- }
- }
- }
- }
- if (hde != null) {
- throw hde;
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- for (int i = 0; i < outputArity; i++) {
- tupleAppender.reset(writeBuffers[i], false);
- tupleAppender.flush(writers[i]);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- HyracksDataException hde = null;
- for (int i = 0; i < outputArity; i++) {
- if (isOpen[i]) {
- try {
- writers[i].fail();
- } catch (Throwable th) {
- if (hde == null) {
- hde = new HyracksDataException();
- }
- hde.addSuppressed(th);
- }
- }
- }
- if (hde != null) {
- throw hde;
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int tupleCount = accessor.getTupleCount();
- for (int i = 0; i < tupleCount; i++) {
- frameTuple.reset(accessor, i);
- boolean found = false;
- for (int j = 0; j < evals.length; j++) {
- try {
- evals[j].evaluate(frameTuple, evalPointable);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
- evalPointable.getStartOffset(), evalPointable.getLength());
- if (found) {
- copyAndAppendTuple(j);
- break;
- }
- }
- // Optionally write to default output branch.
- if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
- copyAndAppendTuple(defaultBranchIndex);
- }
- }
- }
-
- private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
- // Copy tuple into tuple builder.
- try {
- tupleBuilder.reset();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
- frameTuple.getFieldLength(i));
- tupleBuilder.addFieldEndOffset();
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- tupleAppender.reset(writeBuffers[outputIndex], false);
- FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
- tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
- }
-
- @Override
- public void open() throws HyracksDataException {
- for (int i = 0; i < writers.length; i++) {
- isOpen[i] = true;
- writers[i].open();
- }
- // Create write buffers.
- for (int i = 0; i < outputArity; i++) {
- writeBuffers[i] = new VSizeFrame(ctx);
- // Make sure to clear all buffers, since we are reusing the tupleAppender.
- tupleAppender.reset(writeBuffers[i], true);
- }
- // Create evaluators for partitioning.
- try {
- for (int i = 0; i < evalFactories.length; i++) {
- evals[i] = evalFactories[i].createScalarEvaluator(ctx);
- }
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- writers[index] = writer;
- }
- };
- }
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..2215a96
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+/**
+ * Split operator propagates each tuple in a frame to one output branch only unlike Replicate operator.
+ */
+public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private IScalarEvaluatorFactory brachingExprEvalFactory;
+ private IBinaryIntegerInspectorFactory intInsepctorFactory;
+
+ public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+ IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) {
+ super(spec, rDesc, outputArity);
+ this.brachingExprEvalFactory = brachingExprEvalFactory;
+ this.intInsepctorFactory = intInsepctorFactory;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
+ new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ builder.addActivity(this, sma);
+ builder.addSourceEdge(0, sma, 0);
+ for (int i = 0; i < outputArity; i++) {
+ builder.addTargetEdge(i, sma, i);
+ }
+ }
+
+ // The difference between SplitterMaterializerActivityNode and ReplicatorMaterializerActivityNode is that
+ // SplitterMaterializerActivityNode propagates each tuple to one output branch only.
+ private final class SplitterMaterializerActivityNode extends ReplicatorMaterializerActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public SplitterMaterializerActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+ final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
+ final IPointable p = VoidPointable.FACTORY.createPointable();;
+ // To deal with each tuple in a frame
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptors[0]);;
+ final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs];
+ final FrameTupleReference tRef = new FrameTupleReference();;
+ final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx);
+ final IScalarEvaluator eval;
+ try {
+ eval = brachingExprEvalFactory.createScalarEvaluator(ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ }
+
+ return new AbstractUnaryInputOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ isOpen[i] = true;
+ writers[i].open();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ // Tuple based access
+ accessor.reset(bufferAccessor);
+ int tupleCount = accessor.getTupleCount();
+ // The output branch number that starts from 0.
+ int outputBranch;
+
+ for (int i = 0; i < tupleCount; i++) {
+ // Get the output branch number from the field in the given tuple.
+ tRef.reset(accessor, i);
+ try {
+ eval.evaluate(tRef, p);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(),
+ p.getLength());
+
+ // Add this tuple to the correct output frame.
+ FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ HyracksDataException hde = null;
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ if (isOpen[i]) {
+ try {
+ appenders[i].write(writers[i], true);
+ writers[i].close();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ HyracksDataException hde = null;
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ if (isOpen[i]) {
+ try {
+ writers[i].fail();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
new file mode 100644
index 0000000..0ea8a88
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
@@ -0,0 +1,4 @@
+0,first branch1
+0,first branch2
+0,first branch3
+0,first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
new file mode 100644
index 0000000..53588ef
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
@@ -0,0 +1,3 @@
+1,second branch1
+1,second branch2
+1,second branch3
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
new file mode 100644
index 0000000..ceb859a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
@@ -0,0 +1,7 @@
+0|first branch1
+1|second branch1
+0|first branch2
+1|second branch2
+0|first branch3
+1|second branch3
+0|first branch4
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..1276518 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
@@ -83,7 +84,7 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -570,7 +571,7 @@
}
@Test
- public void scanSplitWrite() throws Exception {
+ public void scanReplicateWrite() throws Exception {
final int outputArity = 2;
JobSpecification spec = new JobSpecification(FRAME_SIZE);
@@ -596,7 +597,69 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
- SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+ ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+ for (int i = 0; i < outputArity; i++) {
+ outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] {
+ new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+ }
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
+ for (int i = 0; i < outputArity; i++) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
+ }
+
+ for (int i = 0; i < outputArity; i++) {
+ spec.addRoot(outputOp[i]);
+ }
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ for (int i = 0; i < outputArity; i++) {
+ compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ }
+ }
+
+ @Test
+ public void scanSplitWrite() throws Exception {
+ final int outputArity = 2;
+
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
+
+ String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl",
+ "data/simple/int-string-part1-split-1.tbl" };
+ File[] inputFiles = new File[inputFileName.length];
+ for (int i=0; i<inputFileName.length; i++) {
+ inputFiles[i] = new File(inputFileName[i]);
+ }
+ File[] outputFile = new File[outputArity];
+ for (int i = 0; i < outputArity; i++) {
+ outputFile[i] = File.createTempFile("splitop", null);
+ }
+
+ FileSplit[] inputSplits = new FileSplit[] {
+ new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) };
+ IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits);
+
+ RecordDescriptor scannerDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer() });
+
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+
+ FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+ SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity,
+ new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -609,7 +672,7 @@
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
}
- spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0);
for (int i = 0; i < outputArity; i++) {
spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
}
@@ -620,7 +683,7 @@
AlgebricksHyracksIntegrationUtil.runJob(spec);
for (int i = 0; i < outputArity; i++) {
- compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
similarity index 79%
rename from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 67af861..5c642ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.dataflow.std.misc;
+package org.apache.hyracks.dataflow.std.base;
import java.nio.ByteBuffer;
@@ -32,28 +32,32 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
-public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+/**
+ * Abstract class for two replication related operator descriptor - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor {
+ protected static final long serialVersionUID = 1L;
- private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
- private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+ protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+ protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
- private final boolean[] outputMaterializationFlags;
- private final boolean requiresMaterialization;
- private final int numberOfNonMaterializedOutputs;
- private final int numberOfMaterializedOutputs;
+ protected final boolean[] outputMaterializationFlags;
+ protected final boolean requiresMaterialization;
+ protected final int numberOfNonMaterializedOutputs;
+ protected final int numberOfMaterializedOutputs;
- public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
+ public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+ int outputArity) {
this(spec, rDesc, outputArity, new boolean[outputArity]);
}
- public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
- boolean[] outputMaterializationFlags) {
+ public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+ int outputArity, boolean[] outputMaterializationFlags) {
super(spec, 1, outputArity);
for (int i = 0; i < outputArity; i++) {
recordDescriptors[i] = rDesc;
@@ -80,16 +84,16 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SplitterMaterializerActivityNode sma =
- new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode(
+ new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
int pipelineOutputIndex = 0;
int activityId = MATERIALIZE_READER_ACTIVITY_ID;
for (int i = 0; i < outputArity; i++) {
if (outputMaterializationFlags[i]) {
- MaterializeReaderActivityNode mra =
- new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
+ MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+ new ActivityId(odId, activityId++));
builder.addActivity(this, mra);
builder.addBlockingEdge(sma, mra);
builder.addTargetEdge(i, mra, 0);
@@ -99,16 +103,17 @@
}
}
- private final class SplitterMaterializerActivityNode extends AbstractActivityNode {
+ protected class ReplicatorMaterializerActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public SplitterMaterializerActivityNode(ActivityId id) {
+ public ReplicatorMaterializerActivityNode(ActivityId id) {
super(id);
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private MaterializerTaskState state;
private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
@@ -140,10 +145,8 @@
@Override
public void flush() throws HyracksDataException {
- if (!requiresMaterialization) {
- for (IFrameWriter writer : writers) {
- writer.flush();
- }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ writers[i].flush();
}
}
@@ -204,7 +207,7 @@
}
}
- private final class MaterializeReaderActivityNode extends AbstractActivityNode {
+ protected class MaterializeReaderActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
public MaterializeReaderActivityNode(ActivityId id) {
@@ -227,5 +230,4 @@
};
}
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..0782647
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+
+public class ReplicateOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
+ this(spec, rDesc, outputArity, new boolean[outputArity]);
+ }
+
+ public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+ boolean[] outputMaterializationFlags) {
+ super(spec, rDesc, outputArity, outputMaterializationFlags);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
similarity index 92%
rename from hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
rename to hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
index 40b4251..9a56cd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -23,9 +23,6 @@
import java.io.FileReader;
import java.io.IOException;
-import org.junit.Assert;
-import org.junit.Test;
-
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -40,11 +37,13 @@
import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
+import org.junit.Assert;
+import org.junit.Test;
-public class SplitOperatorTest extends AbstractIntegrationTest {
+public class ReplicateOperatorTest extends AbstractIntegrationTest {
public void compareFiles(String fileNameA, String fileNameB) throws IOException {
BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
@@ -69,7 +68,7 @@
String inputFileName = "data/words.txt";
File[] outputFile = new File[outputArity];
for (int i = 0; i < outputArity; i++) {
- outputFile[i] = File.createTempFile("splitop", null);
+ outputFile[i] = File.createTempFile("replicateop", null);
outputFile[i].deleteOnExit();
}
@@ -86,8 +85,8 @@
inputSplits), stringParser, stringRec);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
- SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
+ ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations);
IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
for (int i = 0; i < outputArity; i++) {
@@ -99,9 +98,9 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
}
- spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
for (int i = 0; i < outputArity; i++) {
- spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
}
for (int i = 0; i < outputArity; i++) {