[ASTERIXDB-3332][COMP] Consolidate projection and filters

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Consolidate projected fields and filter expressions when
scanning (or searching) the same dataset.

Change-Id: I3d8122e71a949c18b39efa8d92ea169173bbdb61
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18063
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE
index a46bfe7..06d538d 100644
--- a/asterixdb/NOTICE
+++ b/asterixdb/NOTICE
@@ -1,5 +1,5 @@
 Apache AsterixDB
-Copyright 2015-2023 The Apache Software Foundation
+Copyright 2015-2024 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineAndRemoveRedundantBooleanExpressionsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineAndRemoveRedundantBooleanExpressionsRule.java
index 7eebf1c..dfd1781 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineAndRemoveRedundantBooleanExpressionsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InlineAndRemoveRedundantBooleanExpressionsRule.java
@@ -86,7 +86,7 @@
         return changed;
     }
 
-    private boolean removeRedundantExpressions(List<Mutable<ILogicalExpression>> exprs) {
+    public static boolean removeRedundantExpressions(List<Mutable<ILogicalExpression>> exprs) {
         final int originalSize = exprs.size();
         int i = 0;
         while (i < exprs.size()) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index f55c4a5..21d6358 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -31,8 +31,9 @@
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
-import org.apache.asterix.optimizer.rules.pushdown.processor.InlineFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -117,8 +118,10 @@
         }
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor
+                .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
         // Inlines AND/OR expression (must be last to run)
-        pushdownProcessorsExecutor.add(new InlineFilterExpressionsProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new InlineAndNormalizeFilterExpressionsProcessor(pushdownContext, context));
     }
 
     /**
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index e565b8d..92dbff8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -63,6 +63,7 @@
     private final Map<LogicalVariable, List<UseDescriptor>> useChain;
     private final List<ILogicalOperator> scopes;
     private final Map<ILogicalOperator, ILogicalExpression> inlinedCache;
+    private final Map<Dataset, List<ScanDefineDescriptor>> datasetToScans;
 
     public PushdownContext() {
         registeredScans = new ArrayList<>();
@@ -71,6 +72,7 @@
         this.useChain = new HashMap<>();
         scopes = new ArrayList<>();
         inlinedCache = new HashMap<>();
+        datasetToScans = new HashMap<>();
     }
 
     public void enterScope(ILogicalOperator operator) {
@@ -94,6 +96,8 @@
             useChain.put(pkVar, new ArrayList<>());
         }
         registeredScans.add(scanDefDesc);
+        List<ScanDefineDescriptor> datasetScans = datasetToScans.computeIfAbsent(dataset, k -> new ArrayList<>());
+        datasetScans.add(scanDefDesc);
     }
 
     public void define(LogicalVariable variable, ILogicalOperator operator, ILogicalExpression expression,
@@ -171,6 +175,10 @@
         return inlinedExpr.cloneExpression();
     }
 
+    public Map<Dataset, List<ScanDefineDescriptor>> getDatasetToScanDefinitionDescriptors() {
+        return datasetToScans;
+    }
+
     private ILogicalExpression cloneAndInline(ILogicalExpression expression, IOptimizationContext context)
             throws CompilationException {
         switch (expression.getExpressionTag()) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ConsolidateProjectionAndFilterExpressionsProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ConsolidateProjectionAndFilterExpressionsProcessor.java
new file mode 100644
index 0000000..f9136d2
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ConsolidateProjectionAndFilterExpressionsProcessor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression.TRUE;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaMergerVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.AllVariablesSubstituteVisitor;
+
+/**
+ *
+ */
+public class ConsolidateProjectionAndFilterExpressionsProcessor extends AbstractPushdownProcessor {
+    private final AllVariablesSubstituteVisitor substituteVisitor;
+    private final ExpectedSchemaMergerVisitor schemaMergerVisitor;
+
+    public ConsolidateProjectionAndFilterExpressionsProcessor(PushdownContext pushdownContext,
+            IOptimizationContext context) {
+        super(pushdownContext, context);
+        substituteVisitor = new AllVariablesSubstituteVisitor();
+        schemaMergerVisitor = new ExpectedSchemaMergerVisitor();
+    }
+
+    @Override
+    public boolean process() throws AlgebricksException {
+        boolean changed = false;
+        Collection<List<ScanDefineDescriptor>> scanDescriptors =
+                pushdownContext.getDatasetToScanDefinitionDescriptors().values();
+        for (List<ScanDefineDescriptor> descriptors : scanDescriptors) {
+            changed |= consolidate(descriptors);
+        }
+        return changed;
+    }
+
+    private boolean consolidate(List<ScanDefineDescriptor> scanDefineDescriptors) throws AlgebricksException {
+        if (scanDefineDescriptors.size() <= 1) {
+            return false;
+        }
+        Map<ILogicalExpression, ARecordType> paths = new HashMap<>();
+        Map<String, FunctionCallInformation> sourceInformationMap = new HashMap<>();
+        ILogicalExpression rangeFilterExpr = TRUE;
+        ILogicalExpression filterExpr = TRUE;
+        RootExpectedSchemaNode mergedRoot = null;
+        RootExpectedSchemaNode mergedMetaRoot = null;
+
+        // First combine filters and projected fields
+        for (ScanDefineDescriptor descriptor : scanDefineDescriptors) {
+            Map<ILogicalExpression, ARecordType> scanPaths = descriptor.getFilterPaths();
+            paths.putAll(scanPaths);
+
+            sourceInformationMap.putAll(descriptor.getPathLocations());
+
+            rangeFilterExpr = or(rangeFilterExpr, descriptor.getRangeFilterExpression());
+
+            filterExpr = or(filterExpr, descriptor.getFilterExpression());
+
+            mergedRoot = schemaMergerVisitor.merge(mergedRoot, descriptor.getRecordNode());
+            mergedMetaRoot = schemaMergerVisitor.merge(mergedMetaRoot, descriptor.getMetaNode());
+        }
+
+        // Set the consolidated pushdown information
+        for (ScanDefineDescriptor descriptor : scanDefineDescriptors) {
+            Map<ILogicalExpression, ARecordType> descriptorPaths = descriptor.getFilterPaths();
+            LogicalVariable scanVariable = descriptor.getVariable();
+            descriptorPaths.clear();
+            cloneAndSubstituteVariable(scanVariable, paths, descriptorPaths);
+
+            Map<String, FunctionCallInformation> decsriptorSourceInformationMap = descriptor.getPathLocations();
+            decsriptorSourceInformationMap.putAll(sourceInformationMap);
+
+            descriptor.setRangeFilterExpression(cloneAndSubstituteVariable(scanVariable, rangeFilterExpr));
+
+            descriptor.setFilterExpression(cloneAndSubstituteVariable(scanVariable, filterExpr));
+            descriptor.setRecordNode(mergedRoot);
+            descriptor.setMetaNode(mergedMetaRoot);
+        }
+
+        return true;
+    }
+
+    private AbstractFunctionCallExpression or(ILogicalExpression mergedExpr, ILogicalExpression expr) {
+        if (expr == null || mergedExpr == null) {
+            return null;
+        }
+        AbstractFunctionCallExpression orExpr = mergedExpr == TRUE ? null : (AbstractFunctionCallExpression) mergedExpr;
+        if (orExpr == null) {
+            MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+            IFunctionInfo fInfo = metadataProvider.lookupFunction(BuiltinFunctions.OR);
+            orExpr = new ScalarFunctionCallExpression(fInfo);
+        }
+        orExpr.getArguments().add(new MutableObject<>(expr));
+        return orExpr;
+    }
+
+    private void cloneAndSubstituteVariable(LogicalVariable scanVariable, Map<ILogicalExpression, ARecordType> src,
+            Map<ILogicalExpression, ARecordType> dest) throws AlgebricksException {
+        for (Map.Entry<ILogicalExpression, ARecordType> srcEntry : src.entrySet()) {
+            ILogicalExpression substitutedExpr = srcEntry.getKey().cloneExpression();
+            substitutedExpr.accept(substituteVisitor, scanVariable);
+            dest.put(substitutedExpr, srcEntry.getValue());
+        }
+    }
+
+    private ILogicalExpression cloneAndSubstituteVariable(LogicalVariable scanVariable, ILogicalExpression expression)
+            throws AlgebricksException {
+        if (expression == null) {
+            return null;
+        }
+
+        ILogicalExpression clonedExpr = expression.cloneExpression();
+        clonedExpr.accept(substituteVisitor, scanVariable);
+        return clonedExpr;
+    }
+
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineAndNormalizeFilterExpressionsProcessor.java
similarity index 83%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineAndNormalizeFilterExpressionsProcessor.java
index 3939873..b4292ff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineAndNormalizeFilterExpressionsProcessor.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.optimizer.rules.InlineAndRemoveRedundantBooleanExpressionsRule;
 import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.commons.lang3.mutable.Mutable;
@@ -35,13 +36,15 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
 /**
- * Inline filter expressions of a scan.
- * E.g.,
+ * Inline and normalize filter expressions of a scan.
+ * Inline example:
  * and(a > 2) --> a > 2
  * and(a > 2, and(b > 2)) --> and(a > 2, b > 2)
+ * Normalization example
+ * and(a, a) -> a
  */
-public class InlineFilterExpressionsProcessor extends AbstractPushdownProcessor {
-    public InlineFilterExpressionsProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+public class InlineAndNormalizeFilterExpressionsProcessor extends AbstractPushdownProcessor {
+    public InlineAndNormalizeFilterExpressionsProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
         super(pushdownContext, context);
     }
 
@@ -91,6 +94,15 @@
             args.addAll(inlinedArgs);
         }
 
+        // Remove redundant expressions from AND/OR
+        if (isAnd(funcExpr) || isOr(funcExpr)) {
+            InlineAndRemoveRedundantBooleanExpressionsRule.removeRedundantExpressions(args);
+            if (args.size() == 1) {
+                // InlineAndRemoveRedundantBooleanExpressionsRule produced a single argument return it
+                return args.get(0).getValue();
+            }
+        }
+
         // either the original expression or the inlined AND/OR expression
         return expression;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractExpectedSchemaNode.java
index 3c6cc95..b895781 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractExpectedSchemaNode.java
@@ -47,7 +47,8 @@
         return functionName;
     }
 
-    protected void setParent(AbstractComplexExpectedSchemaNode parent) {
+    @Override
+    public void setParent(AbstractComplexExpectedSchemaNode parent) {
         this.parent = parent;
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
index b000dc8..d7c4948 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
@@ -23,7 +23,7 @@
 public class ArrayExpectedSchemaNode extends AbstractComplexExpectedSchemaNode {
     private IExpectedSchemaNode child;
 
-    ArrayExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
+    public ArrayExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
             String functionName) {
         super(parent, sourceLocation, functionName);
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
index 6b21508..fa3c196 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
@@ -210,7 +210,9 @@
             IExpectedSchemaNode child) throws AlgebricksException {
         UnionExpectedSchemaNode unionNode = (UnionExpectedSchemaNode) parent;
         ExpectedSchemaNodeType parentType = getExpectedNestedNodeType(parentExpr);
-        addChild(parentExpr, null, unionNode.getChild(parentType), child);
+        AbstractComplexExpectedSchemaNode actualParent = unionNode.getChild(parentType);
+        child.setParent(actualParent);
+        addChild(parentExpr, null, actualParent, child);
     }
 
     private static boolean isVariable(ILogicalExpression expr) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/IExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/IExpectedSchemaNode.java
index ed0001a..345cb84 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/IExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/IExpectedSchemaNode.java
@@ -47,6 +47,13 @@
     AbstractComplexExpectedSchemaNode getParent();
 
     /**
+     * Set parent of a node
+     *
+     * @param parent new parent
+     */
+    void setParent(AbstractComplexExpectedSchemaNode parent);
+
+    /**
      * For visiting a node
      *
      * @param visitor schema node visitor
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
index ff89398..2745a69 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
@@ -20,7 +20,6 @@
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
@@ -29,14 +28,18 @@
 public class ObjectExpectedSchemaNode extends AbstractComplexExpectedSchemaNode {
     private final Map<String, IExpectedSchemaNode> children;
 
-    ObjectExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
+    public ObjectExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
             String functionName) {
         super(parent, sourceLocation, functionName);
         children = new HashMap<>();
     }
 
-    public Set<Map.Entry<String, IExpectedSchemaNode>> getChildren() {
-        return children.entrySet();
+    public boolean isRoot() {
+        return false;
+    }
+
+    public Map<String, IExpectedSchemaNode> getChildren() {
+        return children;
     }
 
     public void addChild(String fieldName, IExpectedSchemaNode child) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
index 189505b..672de68 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
@@ -45,6 +45,11 @@
     }
 
     @Override
+    public boolean isRoot() {
+        return true;
+    }
+
+    @Override
     public AbstractComplexExpectedSchemaNode replaceIfNeeded(ExpectedSchemaNodeType expectedNodeType,
             SourceLocation sourceLocation, String functionName) {
         if (rootType == ALL_FIELDS_ROOT) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
index 3a675b8..1bd316b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
@@ -27,7 +27,7 @@
 public class UnionExpectedSchemaNode extends AbstractComplexExpectedSchemaNode {
     private final Map<ExpectedSchemaNodeType, AbstractComplexExpectedSchemaNode> children;
 
-    protected UnionExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
+    public UnionExpectedSchemaNode(AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation,
             String functionName) {
         super(parent, sourceLocation, functionName);
         children = new EnumMap<>(ExpectedSchemaNodeType.class);
@@ -42,7 +42,7 @@
         throw new UnsupportedOperationException("Cannot replace a child of UNION");
     }
 
-    protected void addChild(AbstractComplexExpectedSchemaNode node) {
+    public void addChild(AbstractComplexExpectedSchemaNode node) {
         children.put(node.getType(), node);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaMergerVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaMergerVisitor.java
new file mode 100644
index 0000000..d9ab052
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaMergerVisitor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ArrayExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNodeVisitor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+
+public class ExpectedSchemaMergerVisitor
+        implements IExpectedSchemaNodeVisitor<IExpectedSchemaNode, IExpectedSchemaNode> {
+    private AbstractComplexExpectedSchemaNode currentParent;
+
+    public RootExpectedSchemaNode merge(RootExpectedSchemaNode left, RootExpectedSchemaNode right) {
+        if (left == null) {
+            return right;
+        } else if (right == null) {
+            return left;
+        }
+
+        return visit(left, right);
+    }
+
+    @Override
+    public RootExpectedSchemaNode visit(RootExpectedSchemaNode node, IExpectedSchemaNode arg) {
+        if (!(arg instanceof RootExpectedSchemaNode)) {
+            // Safeguard against merging root with non-root
+            throw new IllegalStateException("Cannot merge root with non-root node");
+        }
+
+        // The other must be root
+        RootExpectedSchemaNode argRoot = (RootExpectedSchemaNode) arg;
+        if (node.isAllFields() || argRoot.isAllFields()) {
+            // if either of the two roots require the entire fields, then return all fields for both
+            return RootExpectedSchemaNode.ALL_FIELDS_ROOT_IRREPLACEABLE_NODE;
+        }
+
+        // if either the root or argRoot is empty, then return the non-empty node
+        if (argRoot.isEmpty()) {
+            return node;
+        } else if (node.isEmpty()) {
+            return argRoot;
+        }
+
+        // combine
+        RootExpectedSchemaNode mergedRoot = (RootExpectedSchemaNode) RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE
+                .replaceIfNeeded(ExpectedSchemaNodeType.OBJECT, node.getSourceLocation(), node.getFunctionName());
+        mergeObjectFields(mergedRoot, node.getChildren(), argRoot.getChildren());
+        return mergedRoot;
+    }
+
+    @Override
+    public IExpectedSchemaNode visit(ObjectExpectedSchemaNode node, IExpectedSchemaNode arg) {
+        ObjectExpectedSchemaNode mergedObject =
+                new ObjectExpectedSchemaNode(currentParent, node.getSourceLocation(), node.getFunctionName());
+        Map<String, IExpectedSchemaNode> argChildren = Collections.emptyMap();
+        if (arg != null) {
+            ObjectExpectedSchemaNode argObject = (ObjectExpectedSchemaNode) arg;
+            argChildren = argObject.getChildren();
+        }
+
+        mergeObjectFields(mergedObject, node.getChildren(), argChildren);
+
+        return mergedObject;
+    }
+
+    @Override
+    public IExpectedSchemaNode visit(ArrayExpectedSchemaNode node, IExpectedSchemaNode arg) {
+        IExpectedSchemaNode nodeItem = node.getChild();
+        IExpectedSchemaNode argItem = null;
+        if (arg != null) {
+            ArrayExpectedSchemaNode arrayArg = (ArrayExpectedSchemaNode) arg;
+            argItem = arrayArg.getChild();
+        }
+        ArrayExpectedSchemaNode mergedArray =
+                new ArrayExpectedSchemaNode(currentParent, node.getSourceLocation(), node.getFunctionName());
+        AbstractComplexExpectedSchemaNode previousParent = currentParent;
+        currentParent = mergedArray;
+        IExpectedSchemaNode mergedItem = merge(nodeItem, argItem);
+        mergedArray.addChild(mergedItem);
+        currentParent = previousParent;
+
+        return mergedArray;
+    }
+
+    @Override
+    public IExpectedSchemaNode visit(UnionExpectedSchemaNode node, IExpectedSchemaNode arg) {
+        UnionExpectedSchemaNode union =
+                new UnionExpectedSchemaNode(currentParent, node.getSourceLocation(), node.getFunctionName());
+        AbstractComplexExpectedSchemaNode previousParent = currentParent;
+        currentParent = union;
+
+        if (arg == null) {
+            // arg is null, make a copy of the current union children
+            for (Map.Entry<ExpectedSchemaNodeType, AbstractComplexExpectedSchemaNode> nodeChild : node.getChildren()) {
+                IExpectedSchemaNode mergedChild = merge(nodeChild.getValue(), null);
+                union.addChild((AbstractComplexExpectedSchemaNode) mergedChild);
+            }
+        } else if (arg.getType() == ExpectedSchemaNodeType.UNION) {
+            // Merging two unions
+            UnionExpectedSchemaNode argUnion = (UnionExpectedSchemaNode) arg;
+            for (Map.Entry<ExpectedSchemaNodeType, AbstractComplexExpectedSchemaNode> nodeChild : node.getChildren()) {
+                IExpectedSchemaNode child = nodeChild.getValue();
+                IExpectedSchemaNode argChild = argUnion.getChild(child.getType());
+                IExpectedSchemaNode mergedChild = merge(child, argChild);
+                union.addChild((AbstractComplexExpectedSchemaNode) mergedChild);
+            }
+        } else {
+            // Merging a union with either an array or an object
+            for (Map.Entry<ExpectedSchemaNodeType, AbstractComplexExpectedSchemaNode> nodeChild : node.getChildren()) {
+                IExpectedSchemaNode child = nodeChild.getValue();
+                IExpectedSchemaNode argNode = child.getType() == arg.getType() ? arg : null;
+                IExpectedSchemaNode mergedChild = merge(child, argNode);
+                union.addChild((AbstractComplexExpectedSchemaNode) mergedChild);
+            }
+        }
+
+        currentParent = previousParent;
+
+        return union;
+    }
+
+    @Override
+    public IExpectedSchemaNode visit(AnyExpectedSchemaNode node, IExpectedSchemaNode arg) {
+        return new AnyExpectedSchemaNode(currentParent, node.getSourceLocation(), node.getFunctionName());
+    }
+
+    private void mergeObjectFields(ObjectExpectedSchemaNode objectNode, Map<String, IExpectedSchemaNode> left,
+            Map<String, IExpectedSchemaNode> right) {
+        AbstractComplexExpectedSchemaNode previousParent = currentParent;
+        currentParent = objectNode;
+        Set<String> mergedFields = new HashSet<>();
+        mergeObjectFields(objectNode, left, right, mergedFields);
+        mergeObjectFields(objectNode, right, left, mergedFields);
+        currentParent = previousParent;
+    }
+
+    private void mergeObjectFields(ObjectExpectedSchemaNode objectNode, Map<String, IExpectedSchemaNode> left,
+            Map<String, IExpectedSchemaNode> right, Set<String> mergedFields) {
+        for (Map.Entry<String, IExpectedSchemaNode> leftChild : left.entrySet()) {
+            String fieldName = leftChild.getKey();
+            if (mergedFields.contains(fieldName)) {
+                continue;
+            }
+            IExpectedSchemaNode rightChild = right.get(fieldName);
+            IExpectedSchemaNode mergedChild = merge(leftChild.getValue(), rightChild);
+            objectNode.addChild(fieldName, mergedChild);
+            mergedFields.add(fieldName);
+        }
+    }
+
+    private IExpectedSchemaNode merge(IExpectedSchemaNode leftChild, IExpectedSchemaNode rightChild) {
+        if (rightChild == null || leftChild.getType() == ExpectedSchemaNodeType.ANY
+                || leftChild.getType() == ExpectedSchemaNodeType.UNION) {
+            // if rightChild is null then visit leftChild to create a new copy of it
+            // else if leftChild is ANY then return ANY (as everything is required from this child)
+            // else if leftChild is UNION, then visit left first
+            return leftChild.accept(this, rightChild);
+        } else if (rightChild.getType() == ExpectedSchemaNodeType.ANY
+                || rightChild.getType() == ExpectedSchemaNodeType.UNION) {
+            // if rightChild is ANY then return ANY (as everything is required from this child)
+            // else if rightChild is UNION, then visit right first
+            return rightChild.accept(this, leftChild);
+        } else if (leftChild.getType() != rightChild.getType()) {
+            // leftChild and rightChild are not the same and both are either object or array
+            return createUnionNode(leftChild, rightChild);
+        }
+
+        return leftChild.accept(this, rightChild);
+    }
+
+    private IExpectedSchemaNode createUnionNode(IExpectedSchemaNode leftChild, IExpectedSchemaNode rightChild) {
+        UnionExpectedSchemaNode union =
+                new UnionExpectedSchemaNode(currentParent, leftChild.getSourceLocation(), leftChild.getFunctionName());
+        AbstractComplexExpectedSchemaNode previousParent = currentParent;
+        currentParent = union;
+        // Create a copy of leftChild
+        IExpectedSchemaNode leftNewChild = leftChild.accept(this, null);
+        // Create a copy of rightChild
+        IExpectedSchemaNode rightNewChild = rightChild.accept(this, null);
+        // Add both left and right children to the union node
+        union.addChild((AbstractComplexExpectedSchemaNode) leftNewChild);
+        union.addChild((AbstractComplexExpectedSchemaNode) rightNewChild);
+        currentParent = previousParent;
+        return union;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
index f5461d1..729a686 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ARecordType;
@@ -99,11 +98,11 @@
     }
 
     private ARecordType createRecordType(ObjectExpectedSchemaNode node, String arg) {
-        Set<Map.Entry<String, IExpectedSchemaNode>> children = node.getChildren();
+        Map<String, IExpectedSchemaNode> children = node.getChildren();
         String[] childrenFieldNames = new String[children.size()];
         IAType[] childrenTypes = new IAType[children.size()];
         int i = 0;
-        for (Map.Entry<String, IExpectedSchemaNode> child : children) {
+        for (Map.Entry<String, IExpectedSchemaNode> child : children.entrySet()) {
             childrenFieldNames[i] = child.getKey();
             childrenTypes[i++] = child.getValue().accept(this, String.valueOf(counter++));
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
index 5168505..0f2a36a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.visitor;
 
-import static org.apache.asterix.metadata.utils.PushdownUtil.SUPPORTED_FUNCTIONS;
+import static org.apache.asterix.metadata.utils.PushdownUtil.FILTER_PUSHABLE_PATH_FUNCTIONS;
 
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
@@ -98,7 +98,7 @@
 
     private IExpectedSchemaNode handleFunction(AbstractFunctionCallExpression expr) throws AlgebricksException {
         FunctionIdentifier fid = expr.getFunctionIdentifier();
-        if (!SUPPORTED_FUNCTIONS.contains(fid)) {
+        if (!FILTER_PUSHABLE_PATH_FUNCTIONS.contains(fid)) {
             // If not a supported function, return null
             return null;
         }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.000.ddl.sqlpp
new file mode 100644
index 0000000..f487146
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.000.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE CH2Type AS {
+    uid: uuid
+};
+
+CREATE DATASET `orders`(CH2Type) PRIMARY KEY uid AUTOGENERATED WITH {"storage-format": {"format": "column"}};
+CREATE DATASET `stock`(CH2Type) PRIMARY KEY uid AUTOGENERATED WITH {"storage-format": {"format": "column"}};
+CREATE DATASET `supplier`(CH2Type) PRIMARY KEY uid AUTOGENERATED WITH {"storage-format": {"format": "column"}};
+
+CREATE INDEX orderline_delivery_d ON orders(UNNEST o_orderline SELECT ol_delivery_d:STRING) EXCLUDE UNKNOWN KEY;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.010.query.sqlpp
new file mode 100644
index 0000000..acb0019
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.010.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+WITH revenue AS (
+    SELECT s.s_w_id * s.s_i_id MOD 10000 as supplier_no, SUM(ol.ol_amount) AS total_rev
+    FROM stock s, orders o, o.o_orderline ol
+    WHERE ol.ol_i_id = s.s_i_id
+      AND ol.ol_supply_w_id = s.s_w_id
+      -- With index (replicate should appear after the primary index's unnest-map)
+      AND ol.ol_delivery_d >= '2018-01-01 00:00:00.000000'
+      AND ol.ol_delivery_d < '2018-04-01 00:00:00.000000'
+    GROUP BY s.s_w_id * s.s_i_id MOD 10000
+)
+SELECT su.su_suppkey, su.su_name, su.su_address, su.su_phone, r.total_revenue
+FROM revenue r,  supplier su
+WHERE su.su_suppkey = r.supplier_no
+  AND r.total_revenue = (SELECT VALUE max(r1.total_revenue) FROM revenue r1)[0]
+ORDER BY su.su_suppkey;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.011.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.011.query.sqlpp
new file mode 100644
index 0000000..6f5dca8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.011.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+WITH revenue AS (
+    SELECT s.s_w_id * s.s_i_id MOD 10000 as supplier_no, SUM(ol.ol_amount) AS total_rev
+    FROM stock s, orders o, o.o_orderline ol
+    WHERE ol.ol_i_id = s.s_i_id
+      AND ol.ol_supply_w_id = s.s_w_id
+      -- Without index (replicate should appear after the primary index's data-scan)
+      AND ol.ol_delivery_d /*+skip-index*/ >= '2018-01-01 00:00:00.000000'
+      AND ol.ol_delivery_d /*+skip-index*/ < '2018-04-01 00:00:00.000000'
+    GROUP BY s.s_w_id * s.s_i_id MOD 10000
+)
+SELECT su.su_suppkey, su.su_name, su.su_address, su.su_phone, r.total_revenue
+FROM revenue r,  supplier su -- First access to revenue
+WHERE su.su_suppkey = r.supplier_no
+  AND r.total_revenue = (SELECT VALUE max(r1.total_revenue) FROM revenue r1)[0] -- Second access to revenue
+ORDER BY su.su_suppkey;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.020.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.020.query.sqlpp
new file mode 100644
index 0000000..7775165
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.020.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+SELECT COUNT(*)
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt > 4
+  AND o2.o_ol_cnt = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.021.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.021.query.sqlpp
new file mode 100644
index 0000000..b31a03d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.021.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+SELECT COUNT(*)
+FROM orders o1, orders o2
+WHERE o1.o_orderline.ol_dist_info = "x"
+  AND (SOME ol IN o2.o_orderline SATISFIES ol.ol_dist_info = "x")
+  -- Set the join condition below to ensure that both filter above are evaluated first
+  AND o1.o_carrier_id = o2.o_carrier_id
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.022.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.022.query.sqlpp
new file mode 100644
index 0000000..6969c11
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.022.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+SELECT COUNT(*)
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- No pushdown should be done here as o1 requires every tuple (i.e., no tuple is filtered out)
+  AND o2.o_ol_cnt = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.023.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.023.query.sqlpp
new file mode 100644
index 0000000..3efd2f8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.023.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+SELECT COUNT(*)
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Disjunctive predicate should be reduced into a single equality check
+  AND o1.o_ol_cnt = 1
+  AND o2.o_ol_cnt = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.030.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.030.query.sqlpp
new file mode 100644
index 0000000..9b93cc5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.030.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+-- No projection should be done as o1 is requested in its entirety
+SELECT o1
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt > 4
+  AND o2.o_ol_cnt = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.031.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.031.query.sqlpp
new file mode 100644
index 0000000..e1c3188
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.031.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+-- only 'o_id', 'o_carrier_id', and 'o_ol_cnt' should be projected
+SELECT o1.o_id o1_id, o2.o_id o2_id
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt > 4
+  AND o2.o_ol_cnt = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.032.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.032.query.sqlpp
new file mode 100644
index 0000000..60e9865
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.032.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+-- only 'o_id', 'o_carrier_id','o_ol_cnt_1', and 'o_ol_cnt_2' should be projected
+SELECT o1.o_id o1_id, o2.o_id o2_id
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt_1 > 4
+  AND o2.o_ol_cnt_2 = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.033.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.033.query.sqlpp
new file mode 100644
index 0000000..593f8e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.033.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+-- No projection should be done as both o1 and o2 are requested in their entirety
+SELECT o1, o2
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt_1 > 4
+  AND o2.o_ol_cnt_2 = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.034.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.034.query.sqlpp
new file mode 100644
index 0000000..4a2573ff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/replicate/replicate.034.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+EXPLAIN
+-- Union access
+SELECT o1.o_orderline.ol_dist_info o1_dist_info, o2.o_orderline[0].ol_dist_info o2_dist_info
+FROM orders o1, orders o2
+WHERE o1.o_carrier_id = o2.o_carrier_id
+  -- Both predicates should be pushed and combined as a disjunctive predicate
+  AND o1.o_ol_cnt_1 > 4
+  AND o2.o_ol_cnt_2 = 1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.010.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.010.plan
new file mode 100644
index 0000000..aa9bed7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.010.plan
@@ -0,0 +1,282 @@
+distribute result [$$223] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$223]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$223] <- [{"su_suppkey": $$230, "su_name": $$272, "su_address": $$273, "su_phone": $$274, "total_revenue": $$231}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$230(ASC) ]  |PARTITIONED|
+          order (ASC, $$230) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$230(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$230, $$272, $$273, $$274, $$231]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  join (eq($$230, $#1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HYBRID_HASH_JOIN [$#1][$$230]  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$#1]  |PARTITIONED|
+                      project ([$$231, $#1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          join (eq($$231, $$268)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- HYBRID_HASH_JOIN [$$231][$$268]  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- HASH_PARTITION_EXCHANGE [$$231]  |PARTITIONED|
+                              project ([$$231, $#1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                assign [$$231] <- [{"supplier_no": $#1, "total_rev": $$240}.getField("total_revenue")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    group by ([$#1 := $$280]) decor ([]) {
+                                              aggregate [$$240] <- [agg-global-sql-sum($$279)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- AGGREGATE  |LOCAL|
+                                                nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- SORT_GROUP_BY[$$280]  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- HASH_PARTITION_EXCHANGE [$$280]  |PARTITIONED|
+                                        group by ([$$280 := $$225]) decor ([]) {
+                                                  aggregate [$$279] <- [agg-local-sql-sum($$169)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- AGGREGATE  |LOCAL|
+                                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                               } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- SORT_GROUP_BY[$$225]  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            project ([$$169, $$225]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              assign [$$225] <- [numeric-mod(numeric-multiply($$228, $$229), 10000)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ASSIGN  |PARTITIONED|
+                                                project ([$$228, $$229, $$169]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    join (and(eq($$262, $$229), eq($$263, $$228))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- HYBRID_HASH_JOIN [$$229, $$228][$$262, $$263]  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        project ([$$228, $$229]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          assign [$$228, $$229] <- [$$250, $$251] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- HASH_PARTITION_EXCHANGE [$$251, $$250]  |PARTITIONED|
+                                                                  project ([$$250, $$251]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    assign [$$251, $$250] <- [$$253.getField("s_i_id"), $$253.getField("s_w_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      project ([$$253]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          data-scan []<-[$$258, $$253] <- test.stock project ({s_w_id:any,s_i_id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- HASH_PARTITION_EXCHANGE [$$262, $$263]  |PARTITIONED|
+                                                        project ([$$169, $$262, $$263]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          select (and(ge($$227, "2018-01-01 00:00:00.000000"), lt($$227, "2018-04-01 00:00:00.000000"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                            project ([$$169, $$263, $$262, $$227]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              assign [$$169, $$263, $$262, $$227] <- [$$259, $$267, $$266, $$256] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      project ([$$259, $$267, $$266, $$256]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        assign [$$259, $$267, $$266, $$256] <- [$$255.getField("ol_amount"), $$255.getField("ol_supply_w_id"), $$255.getField("ol_i_id"), $$255.getField("ol_delivery_d")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          project ([$$255]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            unnest $$255 <- scan-collection($$270) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- UNNEST  |PARTITIONED|
+                                                                              project ([$$270]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                assign [$$270] <- [$$254.getField("o_orderline")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  project ([$$254]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      unnest-map [$$257, $$254] <- index-search("orders", 0, "Default", "test", "orders", false, false, 1, $$291, 1, $$291, true, true, true) project ({o_orderline:[{ol_delivery_d:any,ol_i_id:any,ol_supply_w_id:any,ol_amount:any}]}) range-filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                      -- BTREE_SEARCH  |PARTITIONED|
+                                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          distinct ([$$291]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                          -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              order (ASC, $$291) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                              -- STABLE_SORT [$$291(ASC)]  |PARTITIONED|
+                                                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  project ([$$291]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      unnest-map [$$290, $$291] <- index-search("orderline_delivery_d", 0, "Default", "test", "orders", false, false, 1, $$288, 1, $$289, true, false, false) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                      -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          assign [$$288, $$289] <- ["2018-01-01 00:00:00.000000", "2018-04-01 00:00:00.000000"] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- HASH_PARTITION_EXCHANGE [$$268]  |PARTITIONED|
+                              project ([$$268]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |UNPARTITIONED|
+                                assign [$$268] <- [get-item($$213, 0)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |UNPARTITIONED|
+                                  aggregate [$$213] <- [listify($$278)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- AGGREGATE  |UNPARTITIONED|
+                                    aggregate [$$278] <- [agg-global-sql-max($$281)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |UNPARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                        aggregate [$$281] <- [agg-local-sql-max($$210)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- AGGREGATE  |PARTITIONED|
+                                          project ([$$210]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$210] <- [{"supplier_no": $$247, "total_rev": $$265}.getField("total_revenue")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                group by ([$$247 := $$283]) decor ([]) {
+                                                          aggregate [$$265] <- [agg-global-sql-sum($$282)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- AGGREGATE  |LOCAL|
+                                                            nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                       } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- SORT_GROUP_BY[$$283]  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- HASH_PARTITION_EXCHANGE [$$283]  |PARTITIONED|
+                                                    group by ([$$283 := $$249]) decor ([]) {
+                                                              aggregate [$$282] <- [agg-local-sql-sum($$259)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- AGGREGATE  |LOCAL|
+                                                                nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- SORT_GROUP_BY[$$249]  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        project ([$$259, $$249]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          assign [$$249] <- [numeric-mod(numeric-multiply($$250, $$251), 10000)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            project ([$$250, $$251, $$259]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                join (and(eq($$266, $$251), eq($$267, $$250))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- HYBRID_HASH_JOIN [$$251, $$250][$$266, $$267]  |PARTITIONED|
+                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- REPLICATE  |PARTITIONED|
+                                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- HASH_PARTITION_EXCHANGE [$$251, $$250]  |PARTITIONED|
+                                                                        project ([$$250, $$251]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          assign [$$251, $$250] <- [$$253.getField("s_i_id"), $$253.getField("s_w_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            project ([$$253]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                data-scan []<-[$$258, $$253] <- test.stock project ({s_w_id:any,s_i_id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- HASH_PARTITION_EXCHANGE [$$266, $$267]  |PARTITIONED|
+                                                                    project ([$$259, $$266, $$267]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      select (and(lt($$256, "2018-04-01 00:00:00.000000"), ge($$256, "2018-01-01 00:00:00.000000"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- REPLICATE  |PARTITIONED|
+                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              project ([$$259, $$267, $$266, $$256]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                assign [$$259, $$267, $$266, $$256] <- [$$255.getField("ol_amount"), $$255.getField("ol_supply_w_id"), $$255.getField("ol_i_id"), $$255.getField("ol_delivery_d")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  project ([$$255]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    unnest $$255 <- scan-collection($$270) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- UNNEST  |PARTITIONED|
+                                                                                      project ([$$270]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        assign [$$270] <- [$$254.getField("o_orderline")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                          project ([$$254]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              unnest-map [$$257, $$254] <- index-search("orders", 0, "Default", "test", "orders", false, false, 1, $$291, 1, $$291, true, true, true) project ({o_orderline:[{ol_delivery_d:any,ol_i_id:any,ol_supply_w_id:any,ol_amount:any}]}) range-filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  distinct ([$$291]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                  -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                                                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      order (ASC, $$291) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                      -- STABLE_SORT [$$291(ASC)]  |PARTITIONED|
+                                                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          project ([$$291]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              unnest-map [$$290, $$291] <- index-search("orderline_delivery_d", 0, "Default", "test", "orders", false, false, 1, $$288, 1, $$289, true, false, false) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  assign [$$288, $$289] <- ["2018-01-01 00:00:00.000000", "2018-04-01 00:00:00.000000"] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$$230]  |PARTITIONED|
+                      project ([$$230, $$272, $$273, $$274]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$274, $$273, $$272, $$230] <- [$$su.getField("su_phone"), $$su.getField("su_address"), $$su.getField("su_name"), $$su.getField("su_suppkey")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$su]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$234, $$su] <- test.supplier project ({su_phone:any,su_suppkey:any,su_name:any,su_address:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.011.plan
new file mode 100644
index 0000000..5abc9f5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.011.plan
@@ -0,0 +1,246 @@
+distribute result [$$223] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$223]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$223] <- [{"su_suppkey": $$230, "su_name": $$272, "su_address": $$273, "su_phone": $$274, "total_revenue": $$231}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$230(ASC) ]  |PARTITIONED|
+          order (ASC, $$230) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$230(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$230, $$272, $$273, $$274, $$231]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  join (eq($$230, $#1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HYBRID_HASH_JOIN [$#1][$$230]  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$#1]  |PARTITIONED|
+                      project ([$$231, $#1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          join (eq($$231, $$268)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- HYBRID_HASH_JOIN [$$231][$$268]  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- HASH_PARTITION_EXCHANGE [$$231]  |PARTITIONED|
+                              project ([$$231, $#1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                assign [$$231] <- [{"supplier_no": $#1, "total_rev": $$240}.getField("total_revenue")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    group by ([$#1 := $$280]) decor ([]) {
+                                              aggregate [$$240] <- [agg-global-sql-sum($$279)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- AGGREGATE  |LOCAL|
+                                                nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- SORT_GROUP_BY[$$280]  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- HASH_PARTITION_EXCHANGE [$$280]  |PARTITIONED|
+                                        group by ([$$280 := $$225]) decor ([]) {
+                                                  aggregate [$$279] <- [agg-local-sql-sum($$169)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- AGGREGATE  |LOCAL|
+                                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                               } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- SORT_GROUP_BY[$$225]  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            project ([$$169, $$225]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              assign [$$225] <- [numeric-mod(numeric-multiply($$228, $$229), 10000)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ASSIGN  |PARTITIONED|
+                                                project ([$$228, $$229, $$169]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    join (and(eq($$262, $$229), eq($$263, $$228))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- HYBRID_HASH_JOIN [$$229, $$228][$$262, $$263]  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        project ([$$228, $$229]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          assign [$$228, $$229] <- [$$250, $$251] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- HASH_PARTITION_EXCHANGE [$$251, $$250]  |PARTITIONED|
+                                                                  project ([$$250, $$251]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    assign [$$251, $$250] <- [$$253.getField("s_i_id"), $$253.getField("s_w_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      project ([$$253]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          data-scan []<-[$$258, $$253] <- test.stock project ({s_w_id:any,s_i_id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- HASH_PARTITION_EXCHANGE [$$262, $$263]  |PARTITIONED|
+                                                        project ([$$169, $$262, $$263]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          select (and(ge($$227, "2018-01-01 00:00:00.000000"), lt($$227, "2018-04-01 00:00:00.000000"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                            project ([$$169, $$263, $$262, $$227]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              assign [$$169, $$263, $$262, $$227] <- [$$259, $$267, $$266, $$256] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      project ([$$259, $$267, $$266, $$256]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        assign [$$259, $$267, $$266, $$256] <- [$$255.getField("ol_amount"), $$255.getField("ol_supply_w_id"), $$255.getField("ol_i_id"), $$255.getField("ol_delivery_d")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          project ([$$255]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            unnest $$255 <- scan-collection($$270) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- UNNEST  |PARTITIONED|
+                                                                              project ([$$270]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                assign [$$270] <- [$$254.getField("o_orderline")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  project ([$$254]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      data-scan []<-[$$257, $$254] <- test.orders project ({o_orderline:[{ol_delivery_d:any,ol_i_id:any,ol_supply_w_id:any,ol_amount:any}]}) filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) range-filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- HASH_PARTITION_EXCHANGE [$$268]  |PARTITIONED|
+                              project ([$$268]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |UNPARTITIONED|
+                                assign [$$268] <- [get-item($$213, 0)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |UNPARTITIONED|
+                                  aggregate [$$213] <- [listify($$278)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- AGGREGATE  |UNPARTITIONED|
+                                    aggregate [$$278] <- [agg-global-sql-max($$281)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |UNPARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                        aggregate [$$281] <- [agg-local-sql-max($$210)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- AGGREGATE  |PARTITIONED|
+                                          project ([$$210]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$210] <- [{"supplier_no": $$247, "total_rev": $$265}.getField("total_revenue")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                group by ([$$247 := $$283]) decor ([]) {
+                                                          aggregate [$$265] <- [agg-global-sql-sum($$282)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- AGGREGATE  |LOCAL|
+                                                            nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                       } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- SORT_GROUP_BY[$$283]  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- HASH_PARTITION_EXCHANGE [$$283]  |PARTITIONED|
+                                                    group by ([$$283 := $$249]) decor ([]) {
+                                                              aggregate [$$282] <- [agg-local-sql-sum($$259)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- AGGREGATE  |LOCAL|
+                                                                nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- SORT_GROUP_BY[$$249]  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        project ([$$259, $$249]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          assign [$$249] <- [numeric-mod(numeric-multiply($$250, $$251), 10000)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            project ([$$250, $$251, $$259]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                join (and(eq($$266, $$251), eq($$267, $$250))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- HYBRID_HASH_JOIN [$$251, $$250][$$266, $$267]  |PARTITIONED|
+                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- REPLICATE  |PARTITIONED|
+                                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- HASH_PARTITION_EXCHANGE [$$251, $$250]  |PARTITIONED|
+                                                                        project ([$$250, $$251]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          assign [$$251, $$250] <- [$$253.getField("s_i_id"), $$253.getField("s_w_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            project ([$$253]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                data-scan []<-[$$258, $$253] <- test.stock project ({s_w_id:any,s_i_id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- HASH_PARTITION_EXCHANGE [$$266, $$267]  |PARTITIONED|
+                                                                    project ([$$259, $$266, $$267]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      select (and(lt($$256, "2018-04-01 00:00:00.000000"), ge($$256, "2018-01-01 00:00:00.000000"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                          -- REPLICATE  |PARTITIONED|
+                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              project ([$$259, $$267, $$266, $$256]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                assign [$$259, $$267, $$266, $$256] <- [$$255.getField("ol_amount"), $$255.getField("ol_supply_w_id"), $$255.getField("ol_i_id"), $$255.getField("ol_delivery_d")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                  project ([$$255]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    unnest $$255 <- scan-collection($$270) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                    -- UNNEST  |PARTITIONED|
+                                                                                      project ([$$270]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        assign [$$270] <- [$$254.getField("o_orderline")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                          project ([$$254]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              data-scan []<-[$$257, $$254] <- test.orders project ({o_orderline:[{ol_delivery_d:any,ol_i_id:any,ol_supply_w_id:any,ol_amount:any}]}) filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) range-filter on: and(ge(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-01-01 00:00:00.000000"), lt(scan-collection($$254.getField("o_orderline")).getField("ol_delivery_d"), "2018-04-01 00:00:00.000000")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$$230]  |PARTITIONED|
+                      project ([$$230, $$272, $$273, $$274]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$274, $$273, $$272, $$230] <- [$$su.getField("su_phone"), $$su.getField("su_address"), $$su.getField("su_name"), $$su.getField("su_suppkey")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$su]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$234, $$su] <- test.supplier project ({su_phone:any,su_suppkey:any,su_name:any,su_address:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.020.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.020.plan
new file mode 100644
index 0000000..1c34eac
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.020.plan
@@ -0,0 +1,70 @@
+distribute result [$$54] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$54]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$54] <- [{"$1": $$59}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$59] <- [agg-sql-sum($$62)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$62] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$57, $$58)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$57][$$58]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$57]  |PARTITIONED|
+                    project ([$$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      select (gt($$o1.getField("o_ol_cnt"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        project ([$$o1, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$o1, $$57] <- [$$o2, $$58] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- REPLICATE  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  assign [$$58] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |PARTITIONED|
+                                    project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        data-scan []<-[$$56, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$58]  |PARTITIONED|
+                    project ([$$58]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              assign [$$58] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan []<-[$$56, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.021.plan
new file mode 100644
index 0000000..41e6763
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.021.plan
@@ -0,0 +1,87 @@
+distribute result [$$68] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$68]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$68] <- [{"$1": $$71}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$71] <- [agg-sql-sum($$78)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$78] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$72, $$73)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$72][$$73]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$72]  |PARTITIONED|
+                    project ([$$72]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      select (eq($$o1.getField("o_orderline").getField("ol_dist_info"), "x")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        assign [$$72] <- [$$o1.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$o1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            assign [$$o1] <- [$$o2] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ASSIGN  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- REPLICATE  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        data-scan []<-[$$70, $$o2] <- test.orders project ({o_carrier_id:any,o_orderline:<[{ol_dist_info:any}],{ol_dist_info:any}>}) filter on: or(eq($$o2.getField("o_orderline").getField("ol_dist_info"), "x"), eq(scan-collection($$o2.getField("o_orderline")).getField("ol_dist_info"), "x")) range-filter on: or(eq($$o2.getField("o_orderline").getField("ol_dist_info"), "x"), eq(scan-collection($$o2.getField("o_orderline")).getField("ol_dist_info"), "x")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
+                    project ([$$73]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      select ($$57) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        project ([$$73, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          subplan {
+                                    aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |LOCAL|
+                                      select (eq($$76, "x")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- STREAM_SELECT  |LOCAL|
+                                        assign [$$76] <- [$$ol.getField("ol_dist_info")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ASSIGN  |LOCAL|
+                                          unnest $$ol <- scan-collection($$75) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- UNNEST  |LOCAL|
+                                            nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                 } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- SUBPLAN  |PARTITIONED|
+                            project ([$$73, $$75]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              assign [$$73, $$75] <- [$$o2.getField("o_carrier_id"), $$o2.getField("o_orderline")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- REPLICATE  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          data-scan []<-[$$70, $$o2] <- test.orders project ({o_carrier_id:any,o_orderline:<[{ol_dist_info:any}],{ol_dist_info:any}>}) filter on: or(eq($$o2.getField("o_orderline").getField("ol_dist_info"), "x"), eq(scan-collection($$o2.getField("o_orderline")).getField("ol_dist_info"), "x")) range-filter on: or(eq($$o2.getField("o_orderline").getField("ol_dist_info"), "x"), eq(scan-collection($$o2.getField("o_orderline")).getField("ol_dist_info"), "x")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.022.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.022.plan
new file mode 100644
index 0000000..d518e0f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.022.plan
@@ -0,0 +1,66 @@
+distribute result [$$52] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$52] <- [{"$1": $$57}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$57] <- [agg-sql-sum($$59)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$59] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$55, $$56)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$55][$$56]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$55]  |PARTITIONED|
+                    project ([$$55]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$o1, $$55] <- [$$o2, $$56] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              assign [$$56] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan []<-[$$54, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$56]  |PARTITIONED|
+                    project ([$$56]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              assign [$$56] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan []<-[$$54, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.023.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.023.plan
new file mode 100644
index 0000000..49a699f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.023.plan
@@ -0,0 +1,68 @@
+distribute result [$$54] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$54]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$54] <- [{"$1": $$59}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$59] <- [agg-sql-sum($$62)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$62] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$57, $$58)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$57][$$58]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    project ([$$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$57] <- [$$58] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- HASH_PARTITION_EXCHANGE [$$58]  |PARTITIONED|
+                              project ([$$58]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |PARTITIONED|
+                                  assign [$$58] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |PARTITIONED|
+                                    project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        data-scan []<-[$$56, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) filter on: eq($$o2.getField("o_ol_cnt"), 1) range-filter on: eq($$o2.getField("o_ol_cnt"), 1) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- REPLICATE  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- HASH_PARTITION_EXCHANGE [$$58]  |PARTITIONED|
+                        project ([$$58]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_SELECT  |PARTITIONED|
+                            assign [$$58] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ASSIGN  |PARTITIONED|
+                              project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$56, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt:any}) filter on: eq($$o2.getField("o_ol_cnt"), 1) range-filter on: eq($$o2.getField("o_ol_cnt"), 1) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.030.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.030.plan
new file mode 100644
index 0000000..c978be9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.030.plan
@@ -0,0 +1,64 @@
+distribute result [$$34] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$34] <- [{"o1": $$o1}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$o1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (eq($$37, $$38)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HYBRID_HASH_JOIN [$$37][$$38]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$37]  |PARTITIONED|
+                select (gt($$o1.getField("o_ol_cnt"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  project ([$$o1, $$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$o1, $$37] <- [$$o2, $$38] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- REPLICATE  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            assign [$$38] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ASSIGN  |PARTITIONED|
+                              project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$36, $$o2] <- test.orders filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$38]  |PARTITIONED|
+                project ([$$38]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- REPLICATE  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          assign [$$38] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$36, $$o2] <- test.orders filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.031.plan
new file mode 100644
index 0000000..90d57ef
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.031.plan
@@ -0,0 +1,66 @@
+distribute result [$$36] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$36] <- [{"o1_id": $$43, "o2_id": $$44}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$43, $$44]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (eq($$39, $$40)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HYBRID_HASH_JOIN [$$39][$$40]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$39]  |PARTITIONED|
+                project ([$$43, $$39]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (gt($$o1.getField("o_ol_cnt"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    project ([$$o1, $$43, $$39]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$o1, $$43, $$39] <- [$$o2, $$44, $$40] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              assign [$$44, $$40] <- [$$o2.getField("o_id"), $$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan []<-[$$38, $$o2] <- test.orders project ({o_carrier_id:any,o_id:any,o_ol_cnt:any}) filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$40]  |PARTITIONED|
+                project ([$$44, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- REPLICATE  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          assign [$$44, $$40] <- [$$o2.getField("o_id"), $$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$38, $$o2] <- test.orders project ({o_carrier_id:any,o_id:any,o_ol_cnt:any}) filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt"), 4), eq($$o2.getField("o_ol_cnt"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.032.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.032.plan
new file mode 100644
index 0000000..1513679
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.032.plan
@@ -0,0 +1,66 @@
+distribute result [$$36] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$36]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$36] <- [{"o1_id": $$43, "o2_id": $$44}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$43, $$44]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (eq($$39, $$40)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HYBRID_HASH_JOIN [$$39][$$40]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$39]  |PARTITIONED|
+                project ([$$43, $$39]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (gt($$o1.getField("o_ol_cnt_1"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    project ([$$o1, $$43, $$39]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$o1, $$43, $$39] <- [$$o2, $$44, $$40] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              assign [$$44, $$40] <- [$$o2.getField("o_id"), $$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan []<-[$$38, $$o2] <- test.orders project ({o_carrier_id:any,o_id:any,o_ol_cnt_2:any,o_ol_cnt_1:any}) filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$40]  |PARTITIONED|
+                project ([$$44, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- REPLICATE  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          assign [$$44, $$40] <- [$$o2.getField("o_id"), $$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$38, $$o2] <- test.orders project ({o_carrier_id:any,o_id:any,o_ol_cnt_2:any,o_ol_cnt_1:any}) filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.033.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.033.plan
new file mode 100644
index 0000000..1d45d6e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.033.plan
@@ -0,0 +1,62 @@
+distribute result [$$34] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$34]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$34] <- [{"o1": $$o1, "o2": $$o2}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$o1, $$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (eq($$37, $$38)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HYBRID_HASH_JOIN [$$37][$$38]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$37]  |PARTITIONED|
+                select (gt($$o1.getField("o_ol_cnt_1"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  project ([$$o1, $$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$o1, $$37] <- [$$o2, $$38] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- REPLICATE  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            assign [$$38] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ASSIGN  |PARTITIONED|
+                              project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$36, $$o2] <- test.orders filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$38]  |PARTITIONED|
+                select (eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- REPLICATE  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        assign [$$38] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$36, $$o2] <- test.orders filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.034.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.034.plan
new file mode 100644
index 0000000..8c67a3f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/replicate/replicate.034.plan
@@ -0,0 +1,70 @@
+distribute result [$$39] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$39]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$39] <- [{"o1_dist_info": $$46, "o2_dist_info": $$47}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        project ([$$46, $$47]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (eq($$42, $$43)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HYBRID_HASH_JOIN [$$42][$$43]  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$42]  |PARTITIONED|
+                project ([$$46, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$46] <- [$$o1.getField("o_orderline").getField("ol_dist_info")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    select (gt($$o1.getField("o_ol_cnt_1"), 4)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_SELECT  |PARTITIONED|
+                      project ([$$o1, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$o1, $$42] <- [$$o2, $$43] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- REPLICATE  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                assign [$$43] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |PARTITIONED|
+                                  project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan []<-[$$41, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt_2:any,o_ol_cnt_1:any,o_orderline:<[{ol_dist_info:any}],{ol_dist_info:any}>}) filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- HASH_PARTITION_EXCHANGE [$$43]  |PARTITIONED|
+                project ([$$47, $$43]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$47] <- [get-item($$o2.getField("o_orderline"), 0).getField("ol_dist_info")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    select (eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_SELECT  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- REPLICATE  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            assign [$$43] <- [$$o2.getField("o_carrier_id")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ASSIGN  |PARTITIONED|
+                              project ([$$o2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$41, $$o2] <- test.orders project ({o_carrier_id:any,o_ol_cnt_2:any,o_ol_cnt_1:any,o_orderline:<[{ol_dist_info:any}],{ol_dist_info:any}>}) filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) range-filter on: or(gt($$o2.getField("o_ol_cnt_1"), 4), eq($$o2.getField("o_ol_cnt_2"), 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 831cf6b..110f95d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16314,6 +16314,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="pushdown/replicate">
+        <output-dir compare="Text">pushdown/replicate</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="select-count-one-field">
         <output-dir compare="Text">select-count-one-field</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index a4bca83..ad70fca 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -78,7 +78,8 @@
 
     public static final String IMPORT_PRIVATE_FUNCTIONS = "import-private-functions";
     //TODO(wyk) add Multiply and Add
-    private static final Set<FunctionIdentifier> COMMUTATIVE_FUNCTIONS = Set.of(BuiltinFunctions.EQ);
+    private static final Set<FunctionIdentifier> COMMUTATIVE_FUNCTIONS =
+            Set.of(BuiltinFunctions.EQ, BuiltinFunctions.AND, BuiltinFunctions.OR);
 
     private static final DataverseName FN_DATASET_DATAVERSE_NAME =
             FunctionSignature.getDataverseName(BuiltinFunctions.DATASET);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
index 9e9011f..c1cb71b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
@@ -27,6 +27,7 @@
 
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -44,9 +45,18 @@
     }
 
     private ColumnDatasetProjectionFiltrationInfo(ColumnDatasetProjectionFiltrationInfo other) {
-        super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression, false);
+        super(other.projectedType, other.functionCallInfoMap, clonePaths(other.filterPaths),
+                cloneExpression(other.filterExpression), false);
         metaProjectedType = other.metaProjectedType;
-        rangeFilterExpression = other.rangeFilterExpression;
+        rangeFilterExpression = cloneExpression(other.rangeFilterExpression);
+    }
+
+    @Override
+    public void substituteFilterVariable(LogicalVariable oldVar, LogicalVariable newVar) {
+        super.substituteFilterVariable(oldVar, newVar);
+        if (rangeFilterExpression != null) {
+            rangeFilterExpression.substituteVar(oldVar, newVar);
+        }
     }
 
     @Override
@@ -127,10 +137,8 @@
         ColumnDatasetProjectionFiltrationInfo otherInfo = (ColumnDatasetProjectionFiltrationInfo) o;
         return projectedType.deepEqual(otherInfo.projectedType)
                 && Objects.equals(metaProjectedType, otherInfo.metaProjectedType)
-                && Objects.equals(functionCallInfoMap, otherInfo.functionCallInfoMap)
-                && Objects.equals(filterExpression, otherInfo.filterExpression)
-                && Objects.equals(filterPaths, otherInfo.filterPaths)
-                && Objects.equals(rangeFilterExpression, otherInfo.rangeFilterExpression);
+                && filterExpressionEquals(filterExpression, otherInfo.filterExpression)
+                && filterExpressionEquals(rangeFilterExpression, otherInfo.rangeFilterExpression);
     }
 
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 3f66e73..e0ea99a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -26,24 +26,25 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.EquivalentVariableExpressionComparatorVisitor;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 public class ExternalDatasetProjectionFiltrationInfo implements IProjectionFiltrationInfo {
     protected final ARecordType projectedType;
-
     protected final ILogicalExpression filterExpression;
-    protected final Map<ILogicalExpression, ARecordType> filterPaths;
     protected final Map<String, FunctionCallInformation> functionCallInfoMap;
     private final boolean embedFilterValues;
+    protected final Map<ILogicalExpression, ARecordType> filterPaths;
 
     public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
             Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths,
@@ -64,13 +65,29 @@
             projectedType = other.projectedType.deepCopy(other.projectedType);
         }
         functionCallInfoMap = new HashMap<>(other.functionCallInfoMap);
-
-        filterExpression = other.filterExpression;
-        filterPaths = new HashMap<>(other.filterPaths);
+        filterExpression = cloneExpression(other.filterExpression);
+        filterPaths = clonePaths(other.filterPaths);
         embedFilterValues = other.embedFilterValues;
     }
 
     @Override
+    public void substituteFilterVariable(LogicalVariable oldVar, LogicalVariable newVar) {
+        if (filterExpression != null) {
+            filterExpression.substituteVar(oldVar, newVar);
+        }
+
+        Map<ILogicalExpression, ARecordType> newPaths = new HashMap<>(filterPaths);
+        // We need to re-add to recompute the hashCode of each expression
+        filterPaths.clear();
+        for (Map.Entry<ILogicalExpression, ARecordType> path : newPaths.entrySet()) {
+            ILogicalExpression expr = path.getKey();
+            ARecordType type = path.getValue();
+            expr.substituteVar(oldVar, newVar);
+            filterPaths.put(expr, type);
+        }
+    }
+
+    @Override
     public ExternalDatasetProjectionFiltrationInfo createCopy() {
         return new ExternalDatasetProjectionFiltrationInfo(this);
     }
@@ -105,7 +122,7 @@
         }
         ExternalDatasetProjectionFiltrationInfo otherInfo = (ExternalDatasetProjectionFiltrationInfo) o;
         return projectedType.deepEqual(otherInfo.projectedType)
-                && Objects.equals(functionCallInfoMap, otherInfo.functionCallInfoMap);
+                && filterExpressionEquals(filterExpression, otherInfo.filterExpression);
     }
 
     @Override
@@ -151,7 +168,23 @@
         }
     }
 
-    protected String getOnelinerSchema(ARecordType type, StringBuilder builder) {
+    protected static ILogicalExpression cloneExpression(ILogicalExpression expression) {
+        if (expression == null) {
+            return null;
+        }
+
+        return expression.cloneExpression();
+    }
+
+    protected static Map<ILogicalExpression, ARecordType> clonePaths(Map<ILogicalExpression, ARecordType> filterPaths) {
+        Map<ILogicalExpression, ARecordType> newFilterPaths = new HashMap<>(filterPaths.size());
+        for (Map.Entry<ILogicalExpression, ARecordType> path : filterPaths.entrySet()) {
+            newFilterPaths.put(path.getKey().cloneExpression(), path.getValue());
+        }
+        return newFilterPaths;
+    }
+
+    protected static String getOnelinerSchema(ARecordType type, StringBuilder builder) {
         //Return oneliner JSON like representation for the requested fields
         SimpleStringBuilderForIATypeVisitor visitor = new SimpleStringBuilderForIATypeVisitor();
         type.accept(visitor, builder);
@@ -160,6 +193,20 @@
         return onelinerSchema;
     }
 
+    protected static boolean filterExpressionEquals(ILogicalExpression expr1, ILogicalExpression expr2) {
+        if (expr1 == expr2) {
+            return true;
+        } else if (expr1 == null || expr2 == null) {
+            return false;
+        }
+
+        try {
+            return expr1.accept(EquivalentVariableExpressionComparatorVisitor.INSTANCE, expr2) == Boolean.TRUE;
+        } catch (AlgebricksException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Serialize expected record type
      *
diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE
index 6e9c46b..e9bb9a4 100644
--- a/hyracks-fullstack/NOTICE
+++ b/hyracks-fullstack/NOTICE
@@ -1,5 +1,5 @@
 Apache Hyracks and Algebricks
-Copyright 2015-2023 The Apache Software Foundation
+Copyright 2015-2024 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/DefaultProjectionFiltrationInfo.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/DefaultProjectionFiltrationInfo.java
index ff13007..077a597 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/DefaultProjectionFiltrationInfo.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/DefaultProjectionFiltrationInfo.java
@@ -32,6 +32,11 @@
     }
 
     @Override
+    public void substituteFilterVariable(LogicalVariable oldVar, LogicalVariable newVar) {
+        // NoOp
+    }
+
+    @Override
     public IProjectionFiltrationInfo createCopy() {
         return INSTANCE;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
index 149731f..4310676 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -29,6 +30,15 @@
  * {@link org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator}
  */
 public interface IProjectionFiltrationInfo {
+
+    /**
+     * Substitute filter expression variables
+     *
+     * @param oldVar old variable
+     * @param newVar new variable
+     */
+    void substituteFilterVariable(LogicalVariable oldVar, LogicalVariable newVar);
+
     /**
      * @return a copy of the {@link IProjectionFiltrationInfo}
      */
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/AllVariablesSubstituteVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/AllVariablesSubstituteVisitor.java
new file mode 100644
index 0000000..06d2181
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/AllVariablesSubstituteVisitor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+/**
+ * Substitute all variables with the provided one
+ * Example substitute all variables with $$myVar:
+ * add($$1, $$2) --> add($$myVar, $$myVar)
+ */
+public class AllVariablesSubstituteVisitor implements ILogicalExpressionVisitor<Void, LogicalVariable> {
+    @Override
+    public Void visitConstantExpression(ConstantExpression expr, LogicalVariable arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitVariableReferenceExpression(VariableReferenceExpression expr, LogicalVariable arg)
+            throws AlgebricksException {
+        expr.setVariable(arg);
+        return null;
+    }
+
+    @Override
+    public Void visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr, LogicalVariable arg)
+            throws AlgebricksException {
+        visitArgs(expr.getArguments(), arg);
+        return null;
+    }
+
+    @Override
+    public Void visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr, LogicalVariable arg)
+            throws AlgebricksException {
+        visitArgs(expr.getArguments(), arg);
+        return null;
+    }
+
+    @Override
+    public Void visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr, LogicalVariable arg)
+            throws AlgebricksException {
+        visitArgs(expr.getArguments(), arg);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr, LogicalVariable arg)
+            throws AlgebricksException {
+        visitArgs(expr.getArguments(), arg);
+        return null;
+    }
+
+    private void visitArgs(List<Mutable<ILogicalExpression>> args, LogicalVariable variable)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> funcArg : args) {
+            funcArg.getValue().accept(this, variable);
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EquivalentVariableExpressionComparatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EquivalentVariableExpressionComparatorVisitor.java
new file mode 100644
index 0000000..233e181
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EquivalentVariableExpressionComparatorVisitor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+/**
+ * Allows to compare two expression with the assumption that all variables are equivalent (e.g., two different
+ * variables originated from two different data-scan operators but on the same dataset)
+ */
+public class EquivalentVariableExpressionComparatorVisitor
+        implements ILogicalExpressionVisitor<Boolean, ILogicalExpression> {
+    public static final EquivalentVariableExpressionComparatorVisitor INSTANCE =
+            new EquivalentVariableExpressionComparatorVisitor();
+
+    private EquivalentVariableExpressionComparatorVisitor() {
+    }
+
+    @Override
+    public Boolean visitConstantExpression(ConstantExpression expr, ILogicalExpression arg) throws AlgebricksException {
+        return expr.equals(arg);
+    }
+
+    @Override
+    public Boolean visitVariableReferenceExpression(VariableReferenceExpression expr, ILogicalExpression arg)
+            throws AlgebricksException {
+        if (arg.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            return Boolean.TRUE;
+        }
+
+        return Boolean.FALSE;
+    }
+
+    @Override
+    public Boolean visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr, ILogicalExpression arg)
+            throws AlgebricksException {
+        return equals(expr, arg);
+    }
+
+    @Override
+    public Boolean visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr, ILogicalExpression arg)
+            throws AlgebricksException {
+        return equals(expr, arg);
+    }
+
+    @Override
+    public Boolean visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr, ILogicalExpression arg)
+            throws AlgebricksException {
+        return equals(expr, arg);
+    }
+
+    @Override
+    public Boolean visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr, ILogicalExpression arg)
+            throws AlgebricksException {
+        return equals(expr, arg);
+    }
+
+    public Boolean equals(AbstractFunctionCallExpression expr1, ILogicalExpression expr2) throws AlgebricksException {
+        if (!(expr2 instanceof AbstractFunctionCallExpression)) {
+            return false;
+        } else {
+            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr2;
+            boolean equal = expr1.getFunctionIdentifier().equals(fce.getFunctionIdentifier());
+            if (!equal) {
+                return false;
+            }
+            List<Mutable<ILogicalExpression>> arguments = expr1.getArguments();
+            int argumentCount = arguments.size();
+            List<Mutable<ILogicalExpression>> fceArguments = fce.getArguments();
+            if (argumentCount != fceArguments.size()) {
+                return false;
+            }
+            for (int i = 0; i < argumentCount; i++) {
+                ILogicalExpression argument = arguments.get(i).getValue();
+                ILogicalExpression fceArgument = fceArguments.get(i).getValue();
+                if (argument.accept(this, fceArgument) == Boolean.FALSE) {
+                    return false;
+                }
+            }
+            return Arrays.deepEquals(expr1.getOpaqueParameters(), fce.getOpaqueParameters());
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 7ceb812..d0b0608 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -140,6 +140,7 @@
             substUsedVariables(op.getMinFilterVars(), pair.first, pair.second);
             substUsedVariables(op.getMaxFilterVars(), pair.first, pair.second);
         }
+        op.getProjectionFiltrationInfo().substituteFilterVariable(pair.first, pair.second);
         return null;
     }
 
@@ -317,6 +318,7 @@
         } else {
             substUsedVariablesInExpr(op.getSelectCondition(), pair.first, pair.second);
         }
+        op.getProjectionFiltrationInfo().substituteFilterVariable(pair.first, pair.second);
         return null;
     }
 
@@ -327,6 +329,7 @@
         if (producedVarFound) {
             substProducedVarInTypeEnvironment(op, pair);
         }
+        op.getProjectionFiltrationInfo().substituteFilterVariable(pair.first, pair.second);
         return null;
     }