[NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Abstract the activities out of the ForwardOperatorDescriptor so we can reuse
the same basic framework for multiple forward Operators.
- Abstract the ForwardOperatorDescriptor out of the ForwardPOperator so we can
reuse the same basic framework for multiple forward Operators.

Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3338
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 86b2b88..2d5e11e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -310,7 +310,7 @@
 
         @Override
         public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
-            sweepExpression(op.getRangeMapExpression().getValue());
+            sweepExpression(op.getSideDataExpression().getValue());
             return null;
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
index bc24b49..70e36e5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
@@ -33,30 +33,30 @@
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 /**
- * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically
- * by doing a pass over the data itself to infer the range map. The operator takes two inputs:
+ * Forward operator is used to forward data to different NCs based on the side data activity that is computed
+ * dynamically by doing a pass over the data itself to infer the range map. The operator takes two inputs:
  * 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC.
- * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up.
- * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which
- * the forward operator will start forwarding the data.
+ * 2. Side Activity (at index 1). The output will be stored in Hyracks context, and the connector will pick it up.
+ * Forward operator will receive the range map when it is broadcast by the operator generating the side activity output
+ * after which the forward operator will start forwarding the data.
  */
 public class ForwardOperator extends AbstractLogicalOperator {
 
-    private final String rangeMapKey;
-    private final Mutable<ILogicalExpression> rangeMapExpression;
+    private final String sideDataKey;
+    private final Mutable<ILogicalExpression> sideDataExpression;
 
-    public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) {
+    public ForwardOperator(String sideDataKey, Mutable<ILogicalExpression> sideDataExpression) {
         super();
-        this.rangeMapKey = rangeMapKey;
-        this.rangeMapExpression = rangeMapExpression;
+        this.sideDataKey = sideDataKey;
+        this.sideDataExpression = sideDataExpression;
     }
 
-    public String getRangeMapKey() {
-        return rangeMapKey;
+    public String getSideDataKey() {
+        return sideDataKey;
     }
 
-    public Mutable<ILogicalExpression> getRangeMapExpression() {
-        return rangeMapExpression;
+    public Mutable<ILogicalExpression> getSideDataExpression() {
+        return sideDataExpression;
     }
 
     @Override
@@ -72,7 +72,7 @@
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        return visitor.transform(rangeMapExpression);
+        return visitor.transform(sideDataExpression);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dd3053b..09358dd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -605,9 +605,9 @@
             return Boolean.FALSE;
         }
         ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg);
-        ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue();
-        ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue();
-        return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey());
+        ILogicalExpression rangeMapExp = op.getSideDataExpression().getValue();
+        ILogicalExpression otherRangeMapExp = otherOp.getSideDataExpression().getValue();
+        return rangeMapExp.equals(otherRangeMapExp) && op.getSideDataKey().equals(otherOp.getSideDataKey());
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 99e852d..34b0ae6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -590,8 +590,8 @@
 
     @Override
     public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException {
-        ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(),
-                exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression()));
+        ForwardOperator opCopy = new ForwardOperator(op.getSideDataKey(),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getSideDataExpression()));
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5be91cc..1727d10 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -345,7 +345,7 @@
 
     @Override
     public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
-        return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression()));
+        return new ForwardOperator(op.getSideDataKey(), deepCopyExpressionRef(op.getSideDataExpression()));
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 550a208..028bf9f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -454,7 +454,7 @@
     @Override
     public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
-        op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second);
+        op.getSideDataExpression().getValue().substituteVar(arg.first, arg.second);
         substVarTypes(op, arg);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 39b9689..845a853 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -428,7 +428,7 @@
 
     @Override
     public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
-        op.getRangeMapExpression().getValue().getUsedVariables(usedVariables);
+        op.getSideDataExpression().getValue().getUsedVariables(usedVariables);
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
similarity index 83%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
index 11c584e..778af18 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
@@ -39,17 +39,17 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
 
 /**
  * <pre>
- * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
- * idx0: Input data source --
- *                           |-- forward op.
- * idx1: RangeMap generator--
+ * {@see {@link ForwardOperator} and {@link AbstractForwardOperatorDescriptor}}
+ * idx0: Input data source    --
+ *                              |-- forward op.
+ * idx1: Side activity output --
  * </pre>
  */
-public class ForwardPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractForwardPOperator extends AbstractPhysicalOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -57,8 +57,18 @@
     }
 
     /**
-     * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at
-     * the data source input.
+     * Get the correct Forward Operator Descriptor
+     * @param builder Hyracks job builder
+     * @param forwardOp Forward Operator
+     * @param  dataInputDescriptor Data input descriptor
+     * @return return the correct operator descriptor
+     */
+    public abstract AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder,
+            ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor);
+
+    /**
+     * Forward operator requires that the global aggregate operator broadcasts side activity output.
+     * No required properties at the data source input.
      * @param op {@see {@link ForwardOperator}}
      * @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan
      * @param context the optimization context
@@ -67,7 +77,7 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
-        // broadcast the range map to the cluster node domain
+        // broadcast the side activity output to the cluster node domain
         INodeDomain targetDomain = context.getComputationNodeDomain();
         List<ILocalStructuralProperty> noProp = new ArrayList<>();
         StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2];
@@ -108,13 +118,13 @@
         ForwardOperator forwardOp = (ForwardOperator) op;
         RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context);
-        ForwardOperatorDescriptor forwardDescriptor =
-                new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor);
+        AbstractForwardOperatorDescriptor forwardDescriptor =
+                getOperatorDescriptor(builder, forwardOp, dataInputDescriptor);
         builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
         ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
-        ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue();
-        builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1);
+        ILogicalOperator sideDataSource = forwardOp.getInputs().get(1).getValue();
+        builder.contributeGraphEdge(sideDataSource, 0, forwardOp, 1);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
new file mode 100644
index 0000000..0903b5e
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor;
+
+public class SortForwardPOperator extends AbstractForwardPOperator {
+
+    @Override
+    public AbstractForwardOperatorDescriptor getOperatorDescriptor(IHyracksJobBuilder builder,
+            ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor) {
+        return new SortForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getSideDataKey(), dataInputDescriptor);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 27d4ced..4128c8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -467,7 +467,7 @@
     @Override
     public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent)
-                .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent));
+                .append("forward: range-map = " + op.getSideDataExpression().getValue().accept(exprVisitor, indent));
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index f5ff12f..e56a8bd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -639,7 +639,7 @@
         addIndent(indent).append("\"operator\": \"forward\"");
         addIndent(0).append(",\n");
         addIndent(indent).append("\"expressions\": \""
-                + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
+                + op.getSideDataExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\"");
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1c761fa..795fb10 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -590,7 +590,7 @@
     @Override
     public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
-        stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")");
+        stringBuilder.append("forward(").append(op.getSideDataExpression().getValue().toString()).append(")");
         appendSchema(op, showDetails);
         appendAnnotations(op, showDetails);
         appendPhysicalOperatorInfo(op, showDetails);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f66a6b8..7dc596c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -65,7 +65,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -76,6 +75,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -880,7 +880,7 @@
         AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable, sourceLoc);
         ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression));
         forwardOperator.setSourceLocation(sourceLoc);
-        forwardOperator.setPhysicalOperator(new ForwardPOperator());
+        forwardOperator.setPhysicalOperator(new SortForwardPOperator());
         forwardOperator.getInputs().add(exchangeOpFromReplicate);
         forwardOperator.getInputs().add(globalAggInput);
         OperatorManipulationUtil.setOperatorMode(forwardOperator);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 612f79e..a7bf11e 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -61,7 +61,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
@@ -78,6 +77,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -398,7 +398,7 @@
                     break;
                 }
                 case FORWARD:
-                    op.setPhysicalOperator(new ForwardPOperator());
+                    op.setPhysicalOperator(new SortForwardPOperator());
                     break;
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
new file mode 100644
index 0000000..b57ad16
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+// TODO(ali): forward operator should probably be moved to asterix layer
+public abstract class AbstractForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    protected static final int FORWARD_DATA_ACTIVITY_ID = 0;
+    protected static final int SIDE_DATA_ACTIVITY_ID = 1;
+    protected String sideDataKey;
+
+    /**
+     * @param spec used to create the operator id.
+     * @param sideDataKey the key used to store the output of the side activity
+     * @param outputRecordDescriptor the output schema of this operator.
+     */
+    public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey,
+            RecordDescriptor outputRecordDescriptor) {
+        super(spec, 2, 1);
+        outRecDescs[0] = outputRecordDescriptor;
+        this.sideDataKey = sideDataKey;
+    }
+
+    /**
+     * @return the forward data activity
+     */
+    public abstract AbstractActivityNode createForwardDataActivity();
+
+    /**
+     * @return the side data activity
+     */
+    public abstract AbstractActivityNode createSideDataActivity();
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        AbstractActivityNode forwardDataActivity = createForwardDataActivity();
+        AbstractActivityNode sideDataActivity = createSideDataActivity();
+
+        // side data activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0
+        builder.addActivity(this, sideDataActivity);
+        builder.addSourceEdge(1, sideDataActivity, 0);
+
+        // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0
+        builder.addActivity(this, forwardDataActivity);
+        builder.addSourceEdge(0, forwardDataActivity, 0);
+
+        // forward data activity will wait for the side data activity
+        builder.addBlockingEdge(sideDataActivity, forwardDataActivity);
+
+        // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0
+        builder.addTargetEdge(0, forwardDataActivity, 0);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
similarity index 83%
rename from hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
rename to hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 49eea0a..1daf9fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -24,7 +24,6 @@
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -42,50 +41,33 @@
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 // TODO(ali): forward operator should probably be moved to asterix layer
-public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescriptor {
     private static final long serialVersionUID = 1L;
-    private static final int FORWARD_DATA_ACTIVITY_ID = 0;
-    private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
-    private final String rangeMapKeyInContext;
 
     /**
      * @param spec used to create the operator id.
-     * @param rangeMapKeyInContext the unique key to store the range map in the shared map & transfer it to partitioner.
+     * @param sideDataKey the unique key to store the range map in the shared map & transfer it to partitioner.
      * @param outputRecordDescriptor the output schema of this operator.
      */
-    public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String rangeMapKeyInContext,
+    public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String sideDataKey,
             RecordDescriptor outputRecordDescriptor) {
-        super(spec, 2, 1);
-        this.rangeMapKeyInContext = rangeMapKeyInContext;
-        outRecDescs[0] = outputRecordDescriptor;
+        super(spec, sideDataKey, outputRecordDescriptor);
     }
 
     @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        ForwardDataActivity forwardDataActivity =
-                new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID));
-        RangeMapReaderActivity rangeMapReaderActivity =
-                new RangeMapReaderActivity(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID));
+    public AbstractActivityNode createForwardDataActivity() {
+        return new ForwardDataActivity(new ActivityId(odId, FORWARD_DATA_ACTIVITY_ID));
+    }
 
-        // range map reader activity, its input is coming through the operator's in-port = 1 & activity's in-port = 0
-        builder.addActivity(this, rangeMapReaderActivity);
-        builder.addSourceEdge(1, rangeMapReaderActivity, 0);
-
-        // forward data activity, its input is coming through the operator's in-port = 0 & activity's in-port = 0
-        builder.addActivity(this, forwardDataActivity);
-        builder.addSourceEdge(0, forwardDataActivity, 0);
-
-        // forward data activity will wait for the range map reader activity
-        builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
-
-        // data leaves from the operator's out-port = 0 & forward data activity's out-port = 0
-        builder.addTargetEdge(0, forwardDataActivity, 0);
+    @Override
+    public AbstractActivityNode createSideDataActivity() {
+        return new RangeMapReaderActivity(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID));
     }
 
     /**
@@ -221,9 +203,9 @@
         public void open() throws HyracksDataException {
             // retrieve the range map from the state object (previous activity should have already stored it)
             // then deposit it into the ctx so that MToN-partition can pick it up
-            Object stateObjKey = new TaskId(new ActivityId(odId, RANGEMAP_READER_ACTIVITY_ID), partition);
+            Object stateObjKey = new TaskId(new ActivityId(odId, SIDE_DATA_ACTIVITY_ID), partition);
             RangeMapState rangeMapState = (RangeMapState) ctx.getStateObject(stateObjKey);
-            TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
+            TaskUtil.put(sideDataKey, rangeMapState.rangeMap, ctx);
             writer.open();
         }