Added replicate operator with materialization

be more aggressive to find shared plans in ExtractCommonOperatorRule
 - find all the isomorphic subgraphs instead of just the ones on join build branches
 - while expanding candidates handle the operators with multiple inputs
 - analyze the DAG to find all the operators that can be co-scheduled, and infer the dependencies between clusters
 - based on the dependencies, decide which outputs of a replicate operator needs materialization
 - if the shared branch needs materialization, and it consists of only trivial operators (such as assign, unnest, datasource scan), that branch is discarded from the candidates

- modified the replicate operator descriptor to materialize the input if needed, and read from the materialized file for the outputs that requires materialization

- removed redundant decor variables in group-by

- fixed a bug on computing live variables for unnest-map operator: if the operator does not propagate inputs, those input variables should not be live anymore

- fixed a bug in ComplexUnnestToProductRule

Change-Id: If221d1507844f9409bf1163f93b0c04ef5848578
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/86
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index f3f9837..dafe300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.algebricks.core.algebra.base;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -57,4 +58,18 @@
     public void setHostQueryContext(Object context);
 
     public Object getHostQueryContext();
+
+    /**
+     * @return labels (0 or 1) for each input and output indicating the dependency between them.
+     *         The edges labeled as 1 must wait for the edges with label 0.
+     */
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op);
+
+    /*
+     * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize the result.
+     * If the subgraph whose result we would like to materialize has an operator that is computationally expensive, we assume
+     * it is cheaper to materialize the result of this subgraph and read from the file rather than recomputing it.
+     */
+    public boolean expensiveThanMaterialization();
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index e5ece69..3d8c4b6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -99,7 +99,7 @@
     @Override

     public boolean isMap() {

         return false;

-    }

+    }
 

     public List<LogicalVariable> getDistinctByVarList() {

         List<LogicalVariable> varList = new ArrayList<LogicalVariable>(expressions.size());

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index f738b50..a456e71 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -273,5 +273,4 @@
         }
         return env;
     }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 6dbd3f5..3f13977 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -15,8 +15,12 @@
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
@@ -28,9 +32,19 @@
 public class ReplicateOperator extends AbstractLogicalOperator {
 
     private int outputArity = 2;
+    private boolean[] outputMaterializationFlags = new boolean[outputArity];
+    private List<Mutable<ILogicalOperator>> outputs;
 
     public ReplicateOperator(int outputArity) {
         this.outputArity = outputArity;
+        this.outputMaterializationFlags = new boolean[outputArity];
+        this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
+    }
+
+    public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
+        this.outputArity = outputArity;
+        this.outputMaterializationFlags = outputMaterializationFlags;
+        this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
     }
 
     @Override
@@ -71,9 +85,29 @@
         return outputArity;
     }
 
+    public int setOutputArity(int outputArity) {
+        return this.outputArity = outputArity;
+    }
+
+    public boolean[] getOutputMaterializationFlags() {
+        return outputMaterializationFlags;
+    }
+
+    public List<Mutable<ILogicalOperator>> getOutputs() {
+        return outputs;
+    }
+
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
+    public boolean isBlocker() {
+        for (boolean requiresMaterialization : outputMaterializationFlags) {
+            if (requiresMaterialization) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index 61e0b33..b7a56be 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -131,5 +131,4 @@
         return !propagateInput;
     }
     */
-
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 1d0fd7c..b8cc3b8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -224,7 +224,11 @@
 
     @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
-        standardLayout(op);
+        if (op.propagatesInput()) {
+            standardLayout(op);
+        } else {
+            VariableUtilities.getProducedVariables(op, schemaVariables);
+        }
         return null;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
index 25fb453..42d964d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -40,6 +40,11 @@
         return false;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
     public abstract Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(
             IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
             throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index 03f03fd..fcc04ab 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
 
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 
 public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
@@ -37,4 +39,16 @@
     public JoinPartitioningType getPartitioningType() {
         return partitioningType;
     }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 1, 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index ac64882..4407b75 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -79,6 +80,17 @@
         return disableJobGenBelow;
     }
 
+    /**
+     * @return labels (0 or 1) for each input and output indicating the dependency between them.
+     *         The edges labeled as 1 must wait for the edges with label 0.
+     */
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[op.getInputs().size()]; // filled with 0's
+        int[] outputDependencyLabels = new int[] { 0 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
     protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op, IOperatorDescriptor opDesc) {
         if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
             AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index c375a3e..74adf5b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -249,4 +249,8 @@
         return null;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 18b442b..299f519 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -26,4 +26,8 @@
         return emptyUnaryRequirements();
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index ffc75c8..8cfe067 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -127,4 +127,15 @@
         }
     }
 
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 99aec26..8b1b447 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -114,4 +114,8 @@
         return true;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 55da00e..3e87281 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -99,4 +99,8 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 1ba07be..54fdf2c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -106,4 +106,9 @@
         ILogicalOperator src = resultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, resultOp, 0);
     }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 107aebd..b714979 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -65,4 +65,8 @@
         builder.contributeMicroOperator(op, runtime, recDesc);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index da70ba7..fe438a5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -256,4 +256,16 @@
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index 0c07c4c..b56d638 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -124,4 +124,8 @@
         return false;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index 5829911..d254151 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -114,4 +114,8 @@
         return false;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index f2b52da..f68b327 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -71,4 +71,8 @@
         builder.contributeMicroOperator(op, runtime, recDesc);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index cd99155..3f78793 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -136,4 +136,8 @@
         return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index c00e8f5..d8f7d99 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -64,10 +65,31 @@
 
         ReplicateOperator rop = (ReplicateOperator) op;
         int outputArity = rop.getOutputArity();
+        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
 
-        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity);
+        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        ReplicateOperator rop = (ReplicateOperator) op;
+        int[] outputDependencyLabels = new int[rop.getOutputArity()];
+        // change the labels of outputs that requires materialization to 1
+        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+        for (int i = 0; i < rop.getOutputArity(); i++) {
+            if (outputMaterializationFlags[i]) {
+                outputDependencyLabels[i] = 1;
+            }
+        }
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 956c74f..3b2387a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,8 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -99,4 +97,8 @@
 
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index c5fa192..aa24221 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -64,4 +64,8 @@
         builder.contributeGraphEdge(src, 0, op, 0);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index fea9701..9105c1f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -104,4 +104,8 @@
         builder.contributeGraphEdge(src, 0, write, 0);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index e92e3ee..48a0fa6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -269,4 +269,16 @@
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 324709a..63ddffe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -100,4 +100,8 @@
         builder.contributeGraphEdge(src, 0, limit, 0);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index fa7d34e..2ad5fda 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -81,4 +81,8 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index bbf8e9a..3999fa6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -73,4 +73,8 @@
         builder.contributeGraphEdge(src, 0, select, 0);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index 39271bb..e6a0be7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -75,4 +75,8 @@
         computeDeliveredPropertiesForUsedVariables(s, s.getInputVariables());
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index 28df9fe..efc609a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -104,4 +104,8 @@
         builder.contributeGraphEdge(src, 0, op, 0);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 7fb6ddc..cda3b00 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -98,4 +98,8 @@
         builder.contributeGraphEdge(src2, 0, op, 1);
     }
 
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 8b241d3..f399184 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -103,4 +103,9 @@
         ILogicalOperator src = writeResultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, writeResultOp, 0);
     }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 4ae956a..e1de495 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -41,7 +41,6 @@
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public class JobGenContext {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 961e527..3af57ad 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -26,7 +26,9 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -43,7 +45,8 @@
         return context;
     }
 
-    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
+    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema,
+            IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
         JobSpecification spec = new JobSpecification(context.getFrameSize());
         if (jobEventListenerFactory != null) {
             spec.setJobletEventListenerFactory(jobEventListenerFactory);
@@ -63,8 +66,8 @@
         return spec;
     }
 
-    private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec, IHyracksJobBuilder builder,
-            IOperatorSchema outerPlanSchema) throws AlgebricksException {
+    private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec,
+            IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) throws AlgebricksException {
         ILogicalOperator op = opRef.getValue();
         int n = op.getInputs().size();
         IOperatorSchema[] schemas = new IOperatorSchema[n];
@@ -100,10 +103,21 @@
             Mutable<ILogicalOperator> child = entry.getKey();
             List<Mutable<ILogicalOperator>> parents = entry.getValue();
             if (parents.size() > 1) {
-                int i = 0;
-                for (Mutable<ILogicalOperator> parent : parents) {
-                    builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
-                    i++;
+                if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+                    ReplicateOperator rop = (ReplicateOperator) child.getValue();
+                    if (rop.isBlocker()) {
+                        // make the order of the graph edges consistent with the order of rop's outputs
+                        List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+                        for (Mutable<ILogicalOperator> parent : parents) {
+                            builder.contributeGraphEdge(child.getValue(), outputs.indexOf(parent), parent.getValue(), 0);
+                        }
+                    } else {
+                        int i = 0;
+                        for (Mutable<ILogicalOperator> parent : parents) {
+                            builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
+                            i++;
+                        }
+                    }
                 }
             }
         }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index 5e6bcae..910a8f9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -264,7 +264,7 @@
                     if (targetUsedVars == null) {
                         return false;
                     }
-                } else {
+                } else if (innerMatches != 0 && outerMatches != 0) {
                     // The current operator produces variables that are used by both partitions, so the inner and outer are not independent and, therefore, we cannot create a join.
                     // TODO: We may still be able to split the operator to create a viable partitioning.
                     return false;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 06b3c06..691006f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -15,15 +15,19 @@
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -48,8 +52,11 @@
 
     private HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
     private List<Mutable<ILogicalOperator>> roots = new ArrayList<Mutable<ILogicalOperator>>();
-    private List<Mutable<ILogicalOperator>> joins = new ArrayList<Mutable<ILogicalOperator>>();
     private List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
+    private HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
+    private HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>();
+    private HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>();
+    private int lastUsedClusterId = 0;
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
@@ -78,17 +85,18 @@
                 changed = false;
                 // applying the rewriting until fixpoint
                 topDownMaterialization(roots);
-                removeNonJoinBuildBranchCandidates();
                 genCandidates(context);
                 removeTrivialShare();
-                removeNonJoinBuildBranchCandidates();
                 if (equivalenceClasses.size() > 0)
                     changed = rewrite(context);
                 if (!rewritten)
                     rewritten = changed;
                 equivalenceClasses.clear();
                 childrenToParents.clear();
-                joins.clear();
+                opToCandidateInputs.clear();
+                clusterMap.clear();
+                clusterWaitForMap.clear();
+                lastUsedClusterId = 0;
             } while (changed);
             roots.clear();
         }
@@ -111,41 +119,6 @@
                 equivalenceClasses.remove(i);
     }
 
-    private void removeNonJoinBuildBranchCandidates() {
-        for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
-            for (int i = candidates.size() - 1; i >= 0; i--) {
-                Mutable<ILogicalOperator> opRef = candidates.get(i);
-                boolean reserve = false;
-                for (Mutable<ILogicalOperator> join : joins)
-                    if (isInJoinBuildBranch(join, opRef)) {
-                        reserve = true;
-                    }
-                if (!reserve)
-                    candidates.remove(i);
-            }
-        }
-        for (int i = equivalenceClasses.size() - 1; i >= 0; i--)
-            if (equivalenceClasses.get(i).size() < 2)
-                equivalenceClasses.remove(i);
-    }
-
-    private boolean isInJoinBuildBranch(Mutable<ILogicalOperator> joinRef, Mutable<ILogicalOperator> opRef) {
-        Mutable<ILogicalOperator> buildBranch = joinRef.getValue().getInputs().get(1);
-        do {
-            if (buildBranch.equals(opRef)) {
-                return true;
-            } else {
-                AbstractLogicalOperator aop = (AbstractLogicalOperator) buildBranch.getValue();
-                if (aop.getOperatorTag() == LogicalOperatorTag.INNERJOIN
-                        || aop.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN
-                        || buildBranch.getValue().getInputs().size() == 0)
-                    return false;
-                else
-                    buildBranch = buildBranch.getValue().getInputs().get(0);
-            }
-        } while (true);
-    }
-
     private boolean rewrite(IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
         for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
@@ -170,40 +143,47 @@
                     members.remove(i);
                 }
             }
-            AbstractLogicalOperator rop = new ReplicateOperator(group.size());
+            boolean[] materializationFlags = computeMaterilizationFlags(group);
+            if (group.isEmpty()) {
+                continue;
+            }
+            candidate = group.get(0);
+            ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
             rop.setPhysicalOperator(new ReplicatePOperator());
             rop.setExecutionMode(ExecutionMode.PARTITIONED);
             Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
             AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
+            List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
 
             if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                 rop.getInputs().add(candidate);
             } else {
                 AbstractLogicalOperator beforeExchange = new ExchangeOperator();
                 beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+                Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
                 beforeExchange.getInputs().add(candidate);
                 context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
-                rop.getInputs().add(new MutableObject<ILogicalOperator>(beforeExchange));
+                rop.getInputs().add(beforeExchangeRef);
             }
             context.computeAndSetTypeEnvironmentForOperator(rop);
 
-            List<Mutable<ILogicalOperator>> parents = childrenToParents.get(candidate);
-            for (Mutable<ILogicalOperator> parentRef : parents) {
+            for (Mutable<ILogicalOperator> parentRef : originalCandidateParents) {
                 AbstractLogicalOperator parent = (AbstractLogicalOperator) parentRef.getValue();
                 int index = parent.getInputs().indexOf(candidate);
                 if (parent.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                     parent.getInputs().set(index, ropRef);
+                    rop.getOutputs().add(parentRef);
                 } else {
                     AbstractLogicalOperator exchange = new ExchangeOperator();
                     exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+                    MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
                     exchange.getInputs().add(ropRef);
+                    rop.getOutputs().add(exchangeRef);
                     context.computeAndSetTypeEnvironmentForOperator(exchange);
-                    // parent.getInputs().get(index).setValue(exchange);
-                    parent.getInputs().set(index, new MutableObject<ILogicalOperator>(exchange));
+                    parent.getInputs().set(index, exchangeRef);
                     context.computeAndSetTypeEnvironmentForOperator(parent);
                 }
             }
-
             List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
             ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
@@ -226,9 +206,11 @@
                 AbstractLogicalOperator exchOp = new ExchangeOperator();
                 exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
                 exchOp.getInputs().add(ropRef);
-
-                assignOperator.getInputs().add(new MutableObject<ILogicalOperator>(exchOp));
+                MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
+                rop.getOutputs().add(exchOpRef);
+                assignOperator.getInputs().add(exchOpRef);
                 projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(assignOperator));
+
                 // set the types
                 context.computeAndSetTypeEnvironmentForOperator(exchOp);
                 context.computeAndSetTypeEnvironmentForOperator(assignOperator);
@@ -247,18 +229,18 @@
                         }
                     }
 
-                    AbstractLogicalOperator exchg = new ExchangeOperator();
-                    exchg.setPhysicalOperator(new OneToOneExchangePOperator());
-
                     ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
                             : projectOperator;
                     if (parentOp.isMap()) {
                         parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
                     } else {
+                        AbstractLogicalOperator exchg = new ExchangeOperator();
+                        exchg.setPhysicalOperator(new OneToOneExchangePOperator());
                         exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
                         parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
+                        context.computeAndSetTypeEnvironmentForOperator(exchg);
                     }
-                    context.computeAndSetTypeEnvironmentForOperator(exchg);
+                    context.computeAndSetTypeEnvironmentForOperator(parentOp);
                 }
             }
             rewritten = true;
@@ -302,11 +284,6 @@
         List<Mutable<ILogicalOperator>> candidates = new ArrayList<Mutable<ILogicalOperator>>();
         List<Mutable<ILogicalOperator>> nextLevel = new ArrayList<Mutable<ILogicalOperator>>();
         for (Mutable<ILogicalOperator> op : tops) {
-            AbstractLogicalOperator aop = (AbstractLogicalOperator) op.getValue();
-            if ((aop.getOperatorTag() == LogicalOperatorTag.INNERJOIN || aop.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN)
-                    && !joins.contains(op)) {
-                joins.add(op);
-            }
             for (Mutable<ILogicalOperator> opRef : op.getValue().getInputs()) {
                 List<Mutable<ILogicalOperator>> opRefList = childrenToParents.get(opRef);
                 if (opRefList == null) {
@@ -335,19 +312,34 @@
         candidates.clear();
         boolean validCandidate = false;
         for (Mutable<ILogicalOperator> op : opList) {
-            for (Mutable<ILogicalOperator> inputRef : op.getValue().getInputs()) {
+            List<Mutable<ILogicalOperator>> inputs = op.getValue().getInputs();
+            for (int i = 0; i < inputs.size(); i++) {
+                Mutable<ILogicalOperator> inputRef = inputs.get(i);
                 validCandidate = false;
-                // if current input is in candidates
-                for (Mutable<ILogicalOperator> candidate : previousCandidates)
-                    if (inputRef.getValue().equals(candidate.getValue()))
-                        validCandidate = true;
-                // if one input is not in candidates
-                if (!validCandidate)
-                    break;
+                for (Mutable<ILogicalOperator> candidate : previousCandidates) {
+                    // if current input is in candidates
+                    if (inputRef.getValue().equals(candidate.getValue())) {
+                        if (inputs.size() == 1) {
+                            validCandidate = true;
+                        } else {
+                            BitSet candidateInputBitMap = opToCandidateInputs.get(op);
+                            if (candidateInputBitMap == null) {
+                                candidateInputBitMap = new BitSet(inputs.size());
+                                opToCandidateInputs.put(op, candidateInputBitMap);
+                            }
+                            candidateInputBitMap.set(i);
+                            if (candidateInputBitMap.cardinality() == inputs.size()) {
+                                validCandidate = true;
+                            }
+                        }
+                        break;
+                    }
+                }
             }
             if (!validCandidate)
                 continue;
-            candidates.add(op);
+            if (!candidates.contains(op))
+                candidates.add(op);
         }
     }
 
@@ -390,4 +382,122 @@
         }
     }
 
+    private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {
+        lastUsedClusterId = 0;
+        for (Mutable<ILogicalOperator> root : roots) {
+            computeClusters(null, root, new MutableInt(++lastUsedClusterId));
+        }
+        boolean[] materializationFlags = new boolean[group.size()];
+        boolean worthMaterialization = worthMaterialization(group.get(0));
+        boolean requiresMaterialization;
+        // get clusterIds for each candidate in the group
+        List<Integer> groupClusterIds = new ArrayList<Integer>(group.size());
+        for (int i = 0; i < group.size(); i++) {
+            groupClusterIds.add(clusterMap.get(group.get(i)).getValue());
+        }
+        for (int i = group.size() - 1; i >= 0; i--) {
+            requiresMaterialization = requiresMaterialization(groupClusterIds, i);
+            if (requiresMaterialization && !worthMaterialization) {
+                group.remove(i);
+                groupClusterIds.remove(i);
+            }
+            materializationFlags[i] = requiresMaterialization;
+        }
+        if (group.size() < 2) {
+            group.clear();
+        }
+        // if does not worth materialization, the flags for the remaining candidates should be false
+        return worthMaterialization ? materializationFlags : new boolean[group.size()];
+    }
+
+    private boolean requiresMaterialization(List<Integer> groupClusterIds, int index) {
+        Integer clusterId = groupClusterIds.get(index);
+        BitSet blockingClusters = new BitSet();
+        getAllBlockingClusterIds(clusterId, blockingClusters);
+        if (!blockingClusters.isEmpty()) {
+            for (int i = 0; i < groupClusterIds.size(); i++) {
+                if (i == index) {
+                    continue;
+                }
+                if (blockingClusters.get(groupClusterIds.get(i))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private void getAllBlockingClusterIds(int clusterId, BitSet blockingClusters) {
+        BitSet waitFor = clusterWaitForMap.get(clusterId);
+        if (waitFor != null) {
+            for (int i = waitFor.nextSetBit(0); i >= 0; i = waitFor.nextSetBit(i + 1)) {
+                getAllBlockingClusterIds(i, blockingClusters);
+            }
+            blockingClusters.or(waitFor);
+        }
+    }
+
+    private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef,
+            MutableInt currentClusterId) {
+        // only replicate operator has multiple outputs
+        int outputIndex = 0;
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+            ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
+            List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+            for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) {
+                if (outputs.get(outputIndex).equals(parentRef)) {
+                    break;
+                }
+            }
+        }
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
+        Pair<int[], int[]> labels = aop.getPhysicalOperator().getInputOutputDependencyLabels(opRef.getValue());
+        List<Mutable<ILogicalOperator>> inputs = opRef.getValue().getInputs();
+        for (int i = 0; i < inputs.size(); i++) {
+            Mutable<ILogicalOperator> inputRef = inputs.get(i);
+            if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { // 1 -> 0
+                if (labels.second.length == 1) {
+                    clusterMap.put(opRef, currentClusterId);
+                    // start a new cluster
+                    MutableInt newClusterId = new MutableInt(++lastUsedClusterId);
+                    computeClusters(opRef, inputRef, newClusterId);
+                    BitSet waitForList = clusterWaitForMap.get(currentClusterId.getValue());
+                    if (waitForList == null) {
+                        waitForList = new BitSet();
+                        clusterWaitForMap.put(currentClusterId.getValue(), waitForList);
+                    }
+                    waitForList.set(newClusterId.getValue());
+                }
+            } else { // 0 -> 0 and 1 -> 1
+                MutableInt prevClusterId = clusterMap.get(opRef);
+                if (prevClusterId == null || prevClusterId.getValue().equals(currentClusterId.getValue())) {
+                    clusterMap.put(opRef, currentClusterId);
+                    computeClusters(opRef, inputRef, currentClusterId);
+                } else {
+                    // merge prevClusterId and currentClusterId: update all the map entries that has currentClusterId to prevClusterId
+                    for (BitSet bs : clusterWaitForMap.values()) {
+                        if (bs.get(currentClusterId.getValue())) {
+                            bs.clear(currentClusterId.getValue());
+                            bs.set(prevClusterId.getValue());
+                        }
+                    }
+                    currentClusterId.setValue(prevClusterId.getValue());
+                }
+            }
+        }
+    }
+
+    private boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) candidate.getValue();
+        if (aop.getPhysicalOperator().expensiveThanMaterialization()) {
+            return true;
+        }
+        List<Mutable<ILogicalOperator>> inputs = candidate.getValue().getInputs();
+        for (Mutable<ILogicalOperator> inputRef : inputs) {
+            if (worthMaterialization(inputRef)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
index bc05181..c34a52f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -48,22 +48,20 @@
  * Replaces redundant variable references with their bottom-most equivalent representative.
  * Does a DFS sweep over the plan keeping track of variable equivalence classes.
  * For example, this rule would perform the following rewrite.
- * 
  * Before Plan:
  * select (function-call: func, Args:[%0->$$11])
- *   project [$11]
- *     assign [$$11] <- [$$10]
- *       assign [$$10] <- [$$9]
- *         assign [$$9] <- ...
- *           ...
- *           
+ * project [$11]
+ * assign [$$11] <- [$$10]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
  * After Plan:
  * select (function-call: func, Args:[%0->$$9])
- *   project [$9]
- *     assign [$$11] <- [$$9]
- *       assign [$$10] <- [$$9]
- *         assign [$$9] <- ...
- *           ...
+ * project [$9]
+ * assign [$$11] <- [$$9]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
  */
 public class RemoveRedundantVariablesRule implements IAlgebraicRewriteRule {
 
@@ -71,7 +69,7 @@
     private final Map<LogicalVariable, List<LogicalVariable>> equivalentVarsMap = new HashMap<LogicalVariable, List<LogicalVariable>>();
 
     protected boolean hasRun = false;
-    
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
@@ -138,7 +136,7 @@
             if (replaceProjectVars((ProjectOperator) op)) {
                 modified = true;
             }
-        } else if(op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+        } else if (op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
             // Replace redundant variables manually in the UnionAll operator.
             if (replaceUnionAllVars((UnionAllOperator) op)) {
                 modified = true;
@@ -205,6 +203,25 @@
                 }
             }
         }
+        // find the redundant variables within the decor list
+        Map<LogicalVariable, LogicalVariable> variableToFirstDecorMap = new HashMap<LogicalVariable, LogicalVariable>();
+        Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+        while (iter.hasNext()) {
+            Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
+            if (dp.first == null || dp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                continue;
+            }
+            LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue()).getVariableReference();
+            LogicalVariable firstDecor = variableToFirstDecorMap.get(dv);
+            if (firstDecor == null) {
+                variableToFirstDecorMap.put(dv, dp.first);
+            } else {
+                // The decor variable dp.first is redundant since firstDecor is exactly the same.
+                updateEquivalenceClassMap(dp.first, firstDecor);
+                iter.remove();
+                modified = true;
+            }
+        }
         return modified;
     }
 
@@ -251,7 +268,7 @@
         }
         return modified;
     }
-    
+
     private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
         @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) {
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
index 9f62e7b..70d8e7b 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
@@ -14,38 +14,58 @@
             -- NESTED_LOOP  |PARTITIONED|
               exchange 
               -- BROADCAST_EXCHANGE  |PARTITIONED|
-                select (function-call: algebricks:eq, Args:[%0->$$2, GERMANY])
-                -- STREAM_SELECT  |PARTITIONED|
-                  exchange 
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    data-scan [$$1, $$2]<-[$$1, $$2, $$3, $$4] <- default.nation
-                    -- DATASOURCE_SCAN  |PARTITIONED|
-                      exchange 
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        empty-tuple-source
-                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                project ([$$1, $$2])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$1, $$2] <- [%0->$$9, %0->$$10]
+                  -- ASSIGN  |PARTITIONED|
+                    exchange 
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      replicate 
+                      -- SPLIT  |PARTITIONED|
+                        exchange 
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
+                          -- STREAM_SELECT  |PARTITIONED|
+                            exchange 
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              replicate 
+                              -- SPLIT  |PARTITIONED|
+                                exchange 
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange 
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               exchange 
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 project ([$$5, $$6])
                 -- STREAM_PROJECT  |PARTITIONED|
-                  select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
-                  -- STREAM_SELECT  |PARTITIONED|
-                    project ([$$5, $$6])
-                    -- STREAM_PROJECT  |PARTITIONED|
-                      assign [$$5, $$6] <- [%0->$$9, %0->$$10]
-                      -- ASSIGN  |PARTITIONED|
-                        exchange 
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          replicate 
-                          -- SPLIT  |PARTITIONED|
-                            exchange 
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
-                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                exchange 
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  empty-tuple-source
-                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange 
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    replicate 
+                    -- SPLIT  |PARTITIONED|
+                      exchange 
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
+                        -- STREAM_SELECT  |PARTITIONED|
+                          project ([$$5, $$6])
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            assign [$$5, $$6] <- [%0->$$9, %0->$$10]
+                            -- ASSIGN  |PARTITIONED|
+                              exchange 
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                replicate 
+                                -- SPLIT  |PARTITIONED|
+                                  exchange 
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      exchange 
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        empty-tuple-source
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
       exchange 
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
         project ([$$10, $$14, $$9, $$13])
@@ -56,34 +76,58 @@
             -- NESTED_LOOP  |PARTITIONED|
               exchange 
               -- BROADCAST_EXCHANGE  |PARTITIONED|
-                select (function-call: algebricks:eq, Args:[%0->$$14, FRANCE])
-                -- STREAM_SELECT  |PARTITIONED|
-                  exchange 
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    data-scan [$$13, $$14]<-[$$13, $$14, $$15, $$16] <- default.nation
-                    -- DATASOURCE_SCAN  |PARTITIONED|
-                      exchange 
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        empty-tuple-source
-                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-              exchange 
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                project ([$$9, $$10])
+                project ([$$13, $$14])
                 -- STREAM_PROJECT  |PARTITIONED|
-                  select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
-                  -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$13, $$14] <- [%0->$$5, %0->$$6]
+                  -- ASSIGN  |PARTITIONED|
                     exchange 
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       replicate 
                       -- SPLIT  |PARTITIONED|
                         exchange 
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
-                          -- DATASOURCE_SCAN  |PARTITIONED|
-                            exchange 
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              empty-tuple-source
-                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
+                          -- STREAM_SELECT  |PARTITIONED|
+                            project ([$$5, $$6])
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              assign [$$5, $$6] <- [%0->$$9, %0->$$10]
+                              -- ASSIGN  |PARTITIONED|
+                                exchange 
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  replicate 
+                                  -- SPLIT  |PARTITIONED|
+                                    exchange 
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange 
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange 
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([$$9, $$10])
+                -- STREAM_PROJECT  |PARTITIONED|
+                  exchange 
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    replicate 
+                    -- SPLIT  |PARTITIONED|
+                      exchange 
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
+                        -- STREAM_SELECT  |PARTITIONED|
+                          exchange 
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            replicate 
+                            -- SPLIT  |PARTITIONED|
+                              exchange 
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange 
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
 write [%0->$$47, %0->$$48, %0->$$49, %0->$$50]
 -- SINK_WRITE  |PARTITIONED|
   project ([$$47, $$48, $$49, $$50])
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
index c86d57f..a2bb4e4 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
@@ -8,17 +8,31 @@
       -- NESTED_LOOP  |PARTITIONED|
         exchange 
         -- BROADCAST_EXCHANGE  |PARTITIONED|
-          data-scan [$$1, $$2]<-[$$1, $$2, $$3, $$4] <- default.nation
-          -- DATASOURCE_SCAN  |PARTITIONED|
-            exchange 
-            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              empty-tuple-source
-              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          project ([$$1, $$2])
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$1, $$2] <- [%0->$$5, %0->$$6]
+            -- ASSIGN  |PARTITIONED|
+              exchange 
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                replicate 
+                -- SPLIT  |PARTITIONED|
+                  exchange 
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange 
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
         exchange 
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
-          -- DATASOURCE_SCAN  |PARTITIONED|
+          replicate 
+          -- SPLIT  |PARTITIONED|
             exchange 
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              empty-tuple-source
-              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                exchange 
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  empty-tuple-source
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
index c4040f2..3923e19 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
@@ -12,14 +12,22 @@
           -- ASSIGN  |PARTITIONED|
             select (function-call: algebricks:gt, Args:[function-call: hive:org.apache.hadoop.hive.ql.udf.UDFOPMultiply, Args:[%0->$$1, 2], 50])
             -- STREAM_SELECT  |PARTITIONED|
-              exchange 
-              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                data-scan []<-[$$1, $$2, $$3, $$4, $$5, $$6, $$7] <- default.supplier
-                -- DATASOURCE_SCAN  |PARTITIONED|
+              project ([$$1, $$2, $$3, $$4, $$5, $$6, $$7])
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$1, $$2, $$3, $$4, $$5, $$6, $$7] <- [%0->$$9, %0->$$10, %0->$$11, %0->$$12, %0->$$13, %0->$$14, %0->$$15]
+                -- ASSIGN  |PARTITIONED|
                   exchange 
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    empty-tuple-source
-                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    replicate 
+                    -- SPLIT  |PARTITIONED|
+                      exchange 
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          exchange 
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            empty-tuple-source
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
       exchange 
       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
         project ([$$16, $$11, $$12, $$10])
@@ -30,9 +38,13 @@
             -- STREAM_SELECT  |PARTITIONED|
               exchange 
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
-                -- DATASOURCE_SCAN  |PARTITIONED|
+                replicate 
+                -- SPLIT  |PARTITIONED|
                   exchange 
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    empty-tuple-source
-                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange 
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index f3f8a2f..700dec8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -23,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 
 public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor {
     private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
new file mode 100644
index 0000000..48de837
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class MaterializerTaskState extends AbstractStateObject {
+    private RunFileWriter out;
+
+    public MaterializerTaskState(JobId jobId, TaskId taskId) {
+        super(jobId, taskId);
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+
+    }
+
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                MaterializerTaskState.class.getSimpleName());
+        out = new RunFileWriter(file, ctx.getIOManager());
+        out.open();
+    }
+
+    public void close() throws HyracksDataException {
+        out.close();
+    }
+
+    public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+        out.nextFrame(buffer);
+    }
+
+    public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+        RunFileReader in = out.createReader();
+        writer.open();
+        try {
+            in.open();
+            while (in.nextFrame(frame)) {
+                frame.flip();
+                writer.nextFrame(frame);
+                frame.clear();
+            }
+            in.close();
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+    
+    public void deleteFile() {
+        out.getFileReference().delete();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 89c20d6..3a405d0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -14,12 +14,8 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.misc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -28,14 +24,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -84,57 +75,6 @@
 
     }
 
-    public static class MaterializerTaskState extends AbstractStateObject {
-        private RunFileWriter out;
-
-        public MaterializerTaskState() {
-        }
-
-        private MaterializerTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-
-        public void open(IHyracksTaskContext ctx) throws HyracksDataException {
-            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                    MaterializingOperatorDescriptor.class.getSimpleName());
-            out = new RunFileWriter(file, ctx.getIOManager());
-            out.open();
-        }
-
-        public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
-            out.nextFrame(buffer);
-        }
-
-        public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
-            RunFileReader in = out.createReader();
-            writer.open();
-            try {
-                in.open();
-                while (in.nextFrame(frame)) {
-                    frame.flip();
-                    writer.nextFrame(frame);
-                    frame.clear();
-                }
-                in.close();
-            } catch (Exception e) {
-                writer.fail();
-                throw new HyracksDataException(e);
-            } finally {
-                writer.close();
-            }
-        }
-    }
-
     private final class MaterializerReaderActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -166,7 +106,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.out.close();
+                    state.close();
                     ByteBuffer frame = ctx.allocateFrame();
                     state.writeOut(writer, frame);
                 }
@@ -202,7 +142,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.out.close();
+                    state.close();
                     ctx.setStateObject(state);
                 }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 0e8f303..b8e1ac8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -18,64 +18,176 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
+    private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+    private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+
+    private boolean[] outputMaterializationFlags;
+    private boolean requiresMaterialization;
+    private int numberOfNonMaterializedOutputs = 0;
+    private int numberOfActiveMaterializeReaders = 0;
+
     public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
+        this(spec, rDesc, outputArity, new boolean[outputArity]);
+    }
+
+    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+            boolean[] outputMaterializationFlags) {
         super(spec, 1, outputArity);
         for (int i = 0; i < outputArity; i++) {
             recordDescriptors[i] = rDesc;
         }
+        this.outputMaterializationFlags = outputMaterializationFlags;
+        requiresMaterialization = false;
+        for (boolean flag : outputMaterializationFlags) {
+            if (flag) {
+                requiresMaterialization = true;
+                break;
+            }
+        }
+
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-            throws HyracksDataException {
-        return new AbstractUnaryInputOperatorNodePushable() {
-            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(new ActivityId(odId,
+                SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        builder.addActivity(this, sma);
+        builder.addSourceEdge(0, sma, 0);
+        int taskOutputIndex = 0;
+        for (int i = 0; i < outputArity; i++) {
+            if (!outputMaterializationFlags[i]) {
+                builder.addTargetEdge(i, sma, taskOutputIndex);
+                taskOutputIndex++;
+            }
+        }
+        numberOfNonMaterializedOutputs = taskOutputIndex;
 
-            @Override
-            public void close() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.close();
+        if (requiresMaterialization) {
+            int activityId = MATERIALIZE_READER_ACTIVITY_ID;
+            for (int i = 0; i < outputArity; i++) {
+                if (outputMaterializationFlags[i]) {
+                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(new ActivityId(odId,
+                            activityId));
+                    builder.addActivity(this, mra);
+                    builder.addTargetEdge(i, mra, 0);
+                    builder.addBlockingEdge(sma, mra);
+                    numberOfActiveMaterializeReaders++;
+                    activityId++;
                 }
             }
+        }
+    }
 
-            @Override
-            public void fail() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.fail();
+    private final class SplitterMaterializerActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public SplitterMaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            return new AbstractUnaryInputOperatorNodePushable() {
+                private MaterializerTaskState state;
+                private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+
+                @Override
+                public void open() throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+                                getActivityId(), partition));
+                        state.open(ctx);
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].open();
+                    }
                 }
-            }
 
-            @Override
-            public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    FrameUtils.flushFrame(bufferAccessor, writer);
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state.appendFrame(bufferAccessor);
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        FrameUtils.flushFrame(bufferAccessor, writers[i]);
+                    }
                 }
-            }
 
-            @Override
-            public void open() throws HyracksDataException {
-                for (IFrameWriter writer : writers) {
-                    writer.open();
+                @Override
+                public void close() throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state.close();
+                        ctx.setStateObject(state);
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].close();
+                    }
                 }
-            }
 
-            @Override
-            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                writers[index] = writer;
-            }
-        };
+                @Override
+                public void fail() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].fail();
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+            };
+        }
+    }
+
+    private final class MaterializeReaderActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MaterializeReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    ByteBuffer frame = ctx.allocateFrame();
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
+                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    state.writeOut(writer, frame);
+                }
+
+                @Override
+                public void deinitialize() throws HyracksDataException {
+                    numberOfActiveMaterializeReaders--;
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
+                            getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    if (numberOfActiveMaterializeReaders == 0) {
+                        state.deleteFile();
+                    }
+                }
+            };
+        }
     }
 }