[NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Abstract the activities out of the ForwardOperatorDescriptor so we can reuse
the same basic framework for multiple forward Operators.
- Abstract the ForwardOperatorDescriptor out of the ForwardPOperator so we can
reuse the same basic framework for multiple forward Operators.
Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3338
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 86b2b88..2d5e11e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -310,7 +310,7 @@
@Override
public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
- sweepExpression(op.getRangeMapExpression().getValue());
+ sweepExpression(op.getSideDataExpression().getValue());
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
index bc24b49..70e36e5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
@@ -33,30 +33,30 @@
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
/**
- * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically
- * by doing a pass over the data itself to infer the range map. The operator takes two inputs:
+ * Forward operator is used to forward data to different NCs based on the side data activity that is computed
+ * dynamically by doing a pass over the data itself to infer the range map. The operator takes two inputs:
* 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC.
- * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up.
- * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which
- * the forward operator will start forwarding the data.
+ * 2. Side Activity (at index 1). The output will be stored in Hyracks context, and the connector will pick it up.
+ * Forward operator will receive the range map when it is broadcast by the operator generating the side activity output
+ * after which the forward operator will start forwarding the data.
*/
public class ForwardOperator extends AbstractLogicalOperator {
- private final String rangeMapKey;
- private final Mutable<ILogicalExpression> rangeMapExpression;
+ private final String sideDataKey;
+ private final Mutable<ILogicalExpression> sideDataExpression;
- public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) {
+ public ForwardOperator(String sideDataKey, Mutable<ILogicalExpression> sideDataExpression) {
super();
- this.rangeMapKey = rangeMapKey;
- this.rangeMapExpression = rangeMapExpression;
+ this.sideDataKey = sideDataKey;
+ this.sideDataExpression = sideDataExpression;
}
- public String getRangeMapKey() {
- return rangeMapKey;
+ public String getSideDataKey() {
+ return sideDataKey;
}
- public Mutable<ILogicalExpression> getRangeMapExpression() {
- return rangeMapExpression;
+ public Mutable<ILogicalExpression> getSideDataExpression() {
+ return sideDataExpression;
}
@Override
@@ -72,7 +72,7 @@
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- return visitor.transform(rangeMapExpression);
+ return visitor.transform(sideDataExpression);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dd3053b..09358dd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -605,9 +605,9 @@
return Boolean.FALSE;
}
ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg);
- ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue();
- ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue();
- return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey());
+ ILogicalExpression rangeMapExp = op.getSideDataExpression().getValue();
+ ILogicalExpression otherRangeMapExp = otherOp.getSideDataExpression().getValue();
+ return rangeMapExp.equals(otherRangeMapExp) && op.getSideDataKey().equals(otherOp.getSideDataKey());
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 99e852d..34b0ae6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -590,8 +590,8 @@
@Override
public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
- ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(),
- exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression()));
+ ForwardOperator opCopy = new ForwardOperator(op.getSideDataKey(),
+ exprDeepCopyVisitor.deepCopyExpressionReference(op.getSideDataExpression()));
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5be91cc..1727d10 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -345,7 +345,7 @@
@Override
public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
- return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression()));
+ return new ForwardOperator(op.getSideDataKey(), deepCopyExpressionRef(op.getSideDataExpression()));
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 550a208..028bf9f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -454,7 +454,7 @@
@Override
public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg)
throws AlgebricksException {
- op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second);
+ op.getSideDataExpression().getValue().substituteVar(arg.first, arg.second);
substVarTypes(op, arg);
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 39b9689..845a853 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -428,7 +428,7 @@
@Override
public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
- op.getRangeMapExpression().getValue().getUsedVariables(usedVariables);
+ op.getSideDataExpression().getValue().getUsedVariables(usedVariables);
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
similarity index 83%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
index 11c584e..778af18 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
@@ -39,17 +39,17 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
/**
* <pre>
- * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
- * idx0: Input data source --
- * |-- forward op.
- * idx1: RangeMap generator--
+ * {@see {@link ForwardOperator} and {@link AbstractForwardOperatorDescriptor}}
+ * idx0: Input data source --
+ * |-- forward op.
+ * idx1: Side activity output --
* </pre>
*/
-public class ForwardPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractForwardPOperator extends AbstractPhysicalOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -57,8 +57,18 @@
}
/**
- * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at
- * the data source input.
+ * Get the correct Forward Operator Descriptor
+ * @param builder Hyracks job builder
+ * @param forwardOp Forward Operator
+ * @param dataInputDescriptor Data input descriptor
+ * @return return the correct operator descriptor
+ */
+ public abstract AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder,
+ ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor);
+
+ /**
+ * Forward operator requires that the global aggregate operator broadcasts side activity output.
+ * No required properties at the data source input.
* @param op {@see {@link ForwardOperator}}
* @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan
* @param context the optimization context
@@ -67,7 +77,7 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
- // broadcast the range map to the cluster node domain
+ // broadcast the side activity output to the cluster node domain
INodeDomain targetDomain = context.getComputationNodeDomain();
List<ILocalStructuralProperty> noProp = new ArrayList<>();
StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2];
@@ -108,13 +118,13 @@
ForwardOperator forwardOp = (ForwardOperator) op;
RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context);
- ForwardOperatorDescriptor forwardDescriptor =
- new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor);
+ AbstractForwardOperatorDescriptor forwardDescriptor =
+ getOperatorDescriptor(builder, forwardOp, dataInputDescriptor);
builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
- ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue();
- builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1);
+ ILogicalOperator sideDataSource = forwardOp.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(sideDataSource, 0, forwardOp, 1);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
new file mode 100644
index 0000000..0903b5e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor;
+
+public class SortForwardPOperator extends AbstractForwardPOperator {
+
+ @Override
+ public AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder,
+ ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor) {
+ return new SortForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getSideDataKey(), dataInputDescriptor);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 27d4ced..4128c8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -467,7 +467,7 @@
@Override
public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
addIndent(indent)
- .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent));
+ .append("forward: range-map = " + op.getSideDataExpression().getValue().accept(exprVisitor, indent));
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index f5ff12f..e56a8bd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -639,7 +639,7 @@
addIndent(indent).append("\"operator\": \"forward\"");
addIndent(0).append(",\n");
addIndent(indent).append("\"expressions\": \""
- + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+ + op.getSideDataExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1c761fa..795fb10 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -590,7 +590,7 @@
@Override
public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
- stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")");
+ stringBuilder.append("forward(").append(op.getSideDataExpression().getValue().toString()).append(")");
appendSchema(op, showDetails);
appendAnnotations(op, showDetails);
appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f66a6b8..7dc596c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -65,7 +65,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -76,6 +75,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -880,7 +880,7 @@
AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable, sourceLoc);
ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression));
forwardOperator.setSourceLocation(sourceLoc);
- forwardOperator.setPhysicalOperator(new ForwardPOperator());
+ forwardOperator.setPhysicalOperator(new SortForwardPOperator());
forwardOperator.getInputs().add(exchangeOpFromReplicate);
forwardOperator.getInputs().add(globalAggInput);
OperatorManipulationUtil.setOperatorMode(forwardOperator);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 612f79e..a7bf11e 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -61,7 +61,6 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
@@ -78,6 +77,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -398,7 +398,7 @@
break;
}
case FORWARD:
- op.setPhysicalOperator(new ForwardPOperator());
+ op.setPhysicalOperator(new SortForwardPOperator());
break;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
new file mode 100644
index 0000000..b57ad16
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+// TODO(ali): forward operator should probably be moved to asterix layer
+public abstract class AbstractForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ protected static final int FORWARD_DATA_ACTIVITY_ID = 0;
+ protected static final int SIDE_DATA_ACTIVITY_ID = 1;
+ protected String sideDataKey;
+
+ /**
+ * @param spec used to create the operator id.
+ * @param sideDataKey the key used to store the output of the side activity
+ * @param outputRecordDescriptor the output schema of this operator.
+ */
+ public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey,
+ RecordDescriptor outputRecordDescriptor) {
+ super(spec, 2, 1);
+ outRecDescs[0] = outputRecordDescriptor;
+ this.sideDataKey = sideDataKey;
+ }
+
+ /**
+ * @return the forward data activity
+ */
+ public abstract AbstractActivityNode createForwardDataActivity();
+
+ /**
+ * @return the side data activity
+ */
+ public abstract AbstractActivityNode createSideDataActivity();
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ AbstractActivityNode forwardDataActivity = createForwardDataActivity();
+ AbstractActivityNode sideDataActivity = createSideDataActivity();
+
+ // side data activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0
+ builder.addActivity(this, sideDataActivity);
+ builder.addSourceEdge(1, sideDataActivity, 0);
+
+ // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0
+ builder.addActivity(this, forwardDataActivity);
+ builder.addSourceEdge(0, forwardDataActivity, 0);
+
+ // forward data activity will wait for the side data activity
+ builder.addBlockingEdge(sideDataActivity, forwardDataActivity);
+
+ // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0
+ builder.addTargetEdge(0, forwardDataActivity, 0);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
similarity index 83%
rename from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 49eea0a..1daf9fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -24,7 +24,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -42,50 +41,33 @@
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
// TODO(ali): forward operator should probably be moved to asterix layer
-public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final int FORWARD_DATA_ACTIVITY_ID = 0;
- private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
- private final String rangeMapKeyInContext;
/**
* @param spec used to create the operator id.
- * @param rangeMapKeyInContext the unique key to store the range map in the shared map & transfer it to partitioner.
+ * @param sideDataKey the unique key to store the range map in the shared map & transfer it to partitioner.
* @param outputRecordDescriptor the output schema of this operator.
*/
- public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String rangeMapKeyInContext,
+ public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey,
RecordDescriptor outputRecordDescriptor) {
- super(spec, 2, 1);
- this.rangeMapKeyInContext = rangeMapKeyInContext;
- outRecDescs[0] = outputRecordDescriptor;
+ super(spec, sideDataKey, outputRecordDescriptor);
}
@Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- ForwardDataActivity forwardDataActivity =
- new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID));
- RangeMapReaderActivity rangeMapReaderActivity =
- new RangeMapReaderActivity(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID));
+ public AbstractActivityNode createForwardDataActivity() {
+ return new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID));
+ }
- // range map reader activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0
- builder.addActivity(this, rangeMapReaderActivity);
- builder.addSourceEdge(1, rangeMapReaderActivity, 0);
-
- // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0
- builder.addActivity(this, forwardDataActivity);
- builder.addSourceEdge(0, forwardDataActivity, 0);
-
- // forward data activity will wait for the range map reader activity
- builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
-
- // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0
- builder.addTargetEdge(0, forwardDataActivity, 0);
+ @Override
+ public AbstractActivityNode createSideDataActivity() {
+ return new RangeMapReaderActivity(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID));
}
/**
@@ -221,9 +203,9 @@
public void open() throws HyracksDataException {
// retrieve the range map from the state object (previous activity should have already stored it)
// then deposit it into the ctx so that MToN-partition can pick it up
- Object stateObjKey = new TaskId(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID), partition);
+ Object stateObjKey = new TaskId(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID), partition);
RangeMapState rangeMapState = (RangeMapState) ctx.getStateObject(stateObjKey);
- TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
+ TaskUtil.put(sideDataKey, rangeMapState.rangeMap, ctx);
writer.open();
}