Moved MaterializeOperator and NestedSubplanToJoinRule to Hyracks.

Change-Id: I74f62bc26706fc72c1baf05f27ce8cdf219cb778
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/168
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index bface33..9ddd927 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,31 +20,32 @@
     DATASOURCESCAN,
     DISTINCT,
     DISTRIBUTE_RESULT,
-    GROUP,
     EMPTYTUPLESOURCE,
     EXCHANGE,
+    EXTENSION_OPERATOR,
+    EXTERNAL_LOOKUP,
+    GROUP,
+    INDEX_INSERT_DELETE,
     INNERJOIN,
+    INSERT_DELETE,
     LEFTOUTERJOIN,
     LIMIT,
+    MATERIALIZE,
     NESTEDTUPLESOURCE,
     ORDER,
-    PROJECT,
     PARTITIONINGSPLIT,
+    PROJECT,
     REPLICATE,
     RUNNINGAGGREGATE,
     SCRIPT,
     SELECT,
     SINK,
     SUBPLAN,
+    TOKENIZE,
     UNIONALL,
     UNNEST,
     UNNEST_MAP,
+    UPDATE,
     WRITE,
     WRITE_RESULT,
-    INSERT_DELETE,
-    INDEX_INSERT_DELETE,
-    UPDATE,
-    EXTENSION_OPERATOR,
-    EXTERNAL_LOOKUP,
-    TOKENIZE
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 48f8230..911bfa1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -20,50 +20,51 @@
     BROADCAST_EXCHANGE,
     BTREE_SEARCH,
     BULKLOAD,
-    INDEX_BULKLOAD,
-    STATS,
     DATASOURCE_SCAN,
     DISTRIBUTE_RESULT,
     EMPTY_TUPLE_SOURCE,
+    EXTENSION_OPERATOR,
     EXTERNAL_GROUP_BY,
-    IN_MEMORY_HASH_JOIN,
+    EXTERNAL_LOOKUP,
     HASH_GROUP_BY,
     HASH_PARTITION_EXCHANGE,
     HASH_PARTITION_MERGE_EXCHANGE,
-    HYBRID_HASH_JOIN,
     HDFS_READER,
+    HYBRID_HASH_JOIN,
+    IN_MEMORY_HASH_JOIN,
     IN_MEMORY_STABLE_SORT,
+    INDEX_BULKLOAD,
+    INDEX_INSERT_DELETE,
+    INSERT_DELETE,
+    LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
+    MATERIALIZE,
     MICRO_PRE_CLUSTERED_GROUP_BY,
     NESTED_LOOP,
     NESTED_TUPLE_SOURCE,
     ONE_TO_ONE_EXCHANGE,
-    PRE_SORTED_DISTINCT_BY,
+    PARTITIONINGSPLIT,
     PRE_CLUSTERED_GROUP_BY,
-    RANGE_PARTITION_EXCHANGE,
+    PRE_SORTED_DISTINCT_BY,
     RANDOM_MERGE_EXCHANGE,
+    RANGE_PARTITION_EXCHANGE,
     RTREE_SEARCH,
     RUNNING_AGGREGATE,
-    SORT_GROUP_BY,
-    SORT_MERGE_EXCHANGE,
+    SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
     SINK,
     SINK_WRITE,
+    SORT_GROUP_BY,
+    SORT_MERGE_EXCHANGE,
     SPLIT,
     STABLE_SORT,
+    STATS,
     STREAM_LIMIT,
-    STREAM_SELECT,
     STREAM_PROJECT,
+    STREAM_SELECT,
     STRING_STREAM_SCRIPT,
     SUBPLAN,
+    TOKENIZE,
     UNION_ALL,
     UNNEST,
-    WRITE_RESULT,
-    INSERT_DELETE,
-    INDEX_INSERT_DELETE,
     UPDATE,
-    SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
-    LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
-    PARTITIONINGSPLIT,
-    EXTENSION_OPERATOR,
-    EXTERNAL_LOOKUP,
-    TOKENIZE
+    WRITE_RESULT,
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java
new file mode 100644
index 0000000..5cecf07
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class MaterializeOperator extends AbstractLogicalOperator {
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.MATERIALIZE;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitMaterializeOperator(this, arg);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 3f13977..2c5bd54 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -89,6 +89,10 @@
         return this.outputArity = outputArity;
     }
 
+    public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
+        this.outputMaterializationFlags = outputMaterializationFlags;
+    }
+
     public boolean[] getOutputMaterializationFlags() {
         return outputMaterializationFlags;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 266ead8..fcccee1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -57,6 +57,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -404,6 +405,12 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 804c0e7..be83320 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -272,6 +273,14 @@
     }
 
     @Override
+    public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
     public Boolean visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.SCRIPT)
@@ -706,6 +715,11 @@
         }
 
         @Override
+        public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+            return new MaterializeOperator();
+        }
+
+        @Override
         public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
             ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
             ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index ea8e82b..99df45c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -170,6 +171,12 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 388d9f9..19fcf65 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -39,6 +39,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -187,6 +188,12 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext context)
             throws AlgebricksException {
         visitAssignment(op, context);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index bf184e9..1fa97a6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -235,6 +236,11 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
         return null;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b31bc3b..0b37b92 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -271,6 +272,12 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
+    @Override
     public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index b2d6022..227cd53 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -377,6 +378,12 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0209492..8796da4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -379,6 +380,11 @@
     }
 
     @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
         op.getDelegate().getUsedVariables(usedVariables);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
new file mode 100644
index 0000000..27f4b25
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+
+public class MaterializePOperator extends AbstractPhysicalOperator {
+
+    private final boolean isSingleActivity;
+
+    public MaterializePOperator(boolean isSingleActivity) {
+        this.isSingleActivity = isSingleActivity;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.MATERIALIZE;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+        MaterializingOperatorDescriptor materializationOpDesc = new MaterializingOperatorDescriptor(
+                builder.getJobSpec(), recDescriptor, isSingleActivity);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        int[] outputDependencyLabels;
+        if (isSingleActivity) {
+            outputDependencyLabels = new int[] { 0 };
+        } else {
+            outputDependencyLabels = new int[] { 1 };
+        }
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a118ee8..2c924d5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -60,8 +61,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class LogicalOperatorPrettyPrintVisitor implements
-        ILogicalOperatorVisitor<String, Integer> {
+public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
 
     ILogicalExpressionVisitor<String, Integer> exprVisitor;
 
@@ -69,42 +69,35 @@
         exprVisitor = new LogicalExpressionPrettyPrintVisitor();
     }
 
-    public LogicalOperatorPrettyPrintVisitor(
-            ILogicalExpressionVisitor<String, Integer> exprVisitor) {
+    public LogicalOperatorPrettyPrintVisitor(ILogicalExpressionVisitor<String, Integer> exprVisitor) {
         this.exprVisitor = exprVisitor;
     }
 
     @Override
-    public String visitAggregateOperator(AggregateOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("aggregate ")
-                .append(op.getVariables()).append(" <- ");
+        addIndent(buffer, indent).append("aggregate ").append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitRunningAggregateOperator(RunningAggregateOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("running-aggregate ")
-                .append(op.getVariables()).append(" <- ");
+        addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
-            Integer indent) {
+    public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("empty-tuple-source");
         return buffer.toString();
     }
 
     @Override
-    public String visitGroupByOperator(GroupByOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitGroupByOperator(GroupByOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("group by (");
         pprintVeList(buffer, op.getGroupByList(), indent);
@@ -116,8 +109,7 @@
     }
 
     @Override
-    public String visitDistinctOperator(DistinctOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitDistinctOperator(DistinctOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("distinct " + "(");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -126,75 +118,62 @@
     }
 
     @Override
-    public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent)
-                .append("join (")
-                .append(op.getCondition().getValue()
-                        .accept(exprVisitor, indent)).append(")");
+        addIndent(buffer, indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+                .append(")");
         return buffer.toString();
     }
 
     @Override
-    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent)
-                .append("left outer join (")
-                .append(op.getCondition().getValue()
-                        .accept(exprVisitor, indent)).append(")");
+        addIndent(buffer, indent).append("left outer join (")
+                .append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
         return buffer.toString();
     }
 
     @Override
-    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
-            Integer indent) {
+    public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("nested tuple source");
         return buffer.toString();
     }
 
     @Override
-    public String visitOrderOperator(OrderOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("order ");
-        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op
-                .getOrderExpressions()) {
+        for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
             String fst;
             switch (p.first.getKind()) {
-            case ASC: {
-                fst = "ASC";
-                break;
+                case ASC: {
+                    fst = "ASC";
+                    break;
+                }
+                case DESC: {
+                    fst = "DESC";
+                    break;
+                }
+                default: {
+                    fst = p.first.getExpressionRef().toString();
+                }
             }
-            case DESC: {
-                fst = "DESC";
-                break;
-            }
-            default: {
-                fst = p.first.getExpressionRef().toString();
-            }
-            }
-            buffer.append("(" + fst + ", "
-                    + p.second.getValue().accept(exprVisitor, indent) + ") ");
+            buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
         }
         return buffer.toString();
     }
 
     @Override
-    public String visitAssignOperator(AssignOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("assign ").append(op.getVariables())
-                .append(" <- ");
+        addIndent(buffer, indent).append("assign ").append(op.getVariables()).append(" <- ");
         pprintExprList(op.getExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitWriteOperator(WriteOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("write ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -202,8 +181,7 @@
     }
 
     @Override
-    public String visitDistributeResultOperator(DistributeResultOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("distribute result ");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -211,42 +189,32 @@
     }
 
     @Override
-    public String visitWriteResultOperator(WriteResultOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitWriteResultOperator(WriteResultOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent)
-                .append("load ")
-                .append(op.getDataSource())
-                .append(" from ")
-                .append(op.getPayloadExpression().getValue()
-                        .accept(exprVisitor, indent))
-                .append(" partitioned by ");
+        addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
+                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
         pprintExprList(op.getKeyExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitSelectOperator(SelectOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitSelectOperator(SelectOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent)
-                .append("select (")
-                .append(op.getCondition().getValue()
-                        .accept(exprVisitor, indent)).append(")");
+        addIndent(buffer, indent).append("select (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+                .append(")");
         return buffer.toString();
     }
 
     @Override
     public String visitProjectOperator(ProjectOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "project " + "(" + op.getVariables() + ")");
+        addIndent(buffer, indent).append("project " + "(" + op.getVariables() + ")");
         return buffer.toString();
     }
 
     @Override
-    public String visitPartitioningSplitOperator(PartitioningSplitOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("partitioning-split (");
         pprintExprList(op.getExpressions(), buffer, indent);
@@ -255,8 +223,7 @@
     }
 
     @Override
-    public String visitSubplanOperator(SubplanOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("subplan {");
         printNestedPlans(op, indent, buffer);
@@ -267,58 +234,44 @@
     public String visitUnionOperator(UnionAllOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("union");
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op
-                .getVariableMappings()) {
-            buffer.append(" (" + v.first + ", " + v.second + ", " + v.third
-                    + ")");
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
+            buffer.append(" (" + v.first + ", " + v.second + ", " + v.third + ")");
         }
         return buffer.toString();
     }
 
     @Override
-    public String visitUnnestOperator(UnnestOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("unnest " + op.getVariable());
         if (op.getPositionalVariable() != null) {
             buffer.append(" at " + op.getPositionalVariable());
         }
-        buffer.append(" <- "
-                + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+        buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
         return buffer.toString();
     }
 
     @Override
-    public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "unnest-map "
-                        + op.getVariables()
-                        + " <- "
-                        + op.getExpressionRef().getValue()
-                                .accept(exprVisitor, indent));
+                "unnest-map " + op.getVariables() + " <- "
+                        + op.getExpressionRef().getValue().accept(exprVisitor, indent));
         return buffer.toString();
     }
 
     @Override
-    public String visitDataScanOperator(DataSourceScanOperator op,
-            Integer indent) {
+    public String visitDataScanOperator(DataSourceScanOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "data-scan " + op.getProjectVariables() + "<-"
-                        + op.getVariables() + " <- " + op.getDataSource());
+                "data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
         return buffer.toString();
     }
 
     @Override
-    public String visitLimitOperator(LimitOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append(
-                "limit "
-                        + op.getMaxObjects().getValue()
-                                .accept(exprVisitor, indent));
+        addIndent(buffer, indent).append("limit " + op.getMaxObjects().getValue().accept(exprVisitor, indent));
         ILogicalExpression offset = op.getOffset().getValue();
         if (offset != null) {
             buffer.append(", " + offset.accept(exprVisitor, indent));
@@ -337,32 +290,30 @@
     public String visitScriptOperator(ScriptOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "script (in: " + op.getInputVariables() + ") (out: "
-                        + op.getOutputVariables() + ")");
+                "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
         return buffer.toString();
     }
 
     @Override
-    public String visitReplicateOperator(ReplicateOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitReplicateOperator(ReplicateOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("replicate ");
         return buffer.toString();
     }
 
     @Override
-    public String visitInsertDeleteOperator(InsertDeleteOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into "
-                : "delete from ";
-        addIndent(buffer, indent)
-                .append(header)
-                .append(op.getDataSource())
-                .append(" from ")
-                .append(op.getPayloadExpression().getValue()
-                        .accept(exprVisitor, indent))
-                .append(" partitioned by ");
+        addIndent(buffer, indent).append("materialize ");
+        return buffer.toString();
+    }
+
+    @Override
+    public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
+                .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
         pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
@@ -371,14 +322,12 @@
     }
 
     @Override
-    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
-            Integer indent) throws AlgebricksException {
+    public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+            throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        String header = op.getOperation() == Kind.INSERT ? "insert into "
-                : "delete from ";
-        addIndent(buffer, indent).append(header).append(op.getIndexName())
-                .append(" on ").append(op.getDataSourceIndex().getDataSource())
-                .append(" from ");
+        String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+        addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
+                .append(op.getDataSourceIndex().getDataSource()).append(" from ");
         pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
         if (op.isBulkload()) {
             buffer.append(" [bulkload]");
@@ -387,41 +336,36 @@
     }
 
     @Override
-    public String visitTokenizeOperator(TokenizeOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("tokenize ")
-                .append(op.getTokenizeVars()).append(" <- ");
+        addIndent(buffer, indent).append("tokenize ").append(op.getTokenizeVars()).append(" <- ");
         pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
         return buffer.toString();
     }
 
     @Override
-    public String visitSinkOperator(SinkOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("sink");
         return buffer.toString();
     }
 
     @Override
-    public String visitExtensionOperator(ExtensionOperator op, Integer indent)
-            throws AlgebricksException {
+    public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(op.toString());
         return buffer.toString();
     }
 
-    protected static final StringBuilder addIndent(StringBuilder buffer,
-            int level) {
+    protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
         for (int i = 0; i < level; ++i) {
             buffer.append(' ');
         }
         return buffer;
     }
 
-    protected void printNestedPlans(AbstractOperatorWithNestedPlans op,
-            Integer indent, StringBuilder buffer) throws AlgebricksException {
+    protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
+            throws AlgebricksException {
         boolean first = true;
         if (op.getNestedPlans().isEmpty()) {
             buffer.append("}");
@@ -441,9 +385,8 @@
         }
     }
 
-    protected void pprintExprList(
-            List<Mutable<ILogicalExpression>> expressions,
-            StringBuilder buffer, Integer indent) throws AlgebricksException {
+    protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, StringBuilder buffer, Integer indent)
+            throws AlgebricksException {
         buffer.append("[");
         boolean first = true;
         for (Mutable<ILogicalExpression> exprRef : expressions) {
@@ -457,9 +400,7 @@
         buffer.append("]");
     }
 
-    protected void pprintVeList(
-            StringBuilder sb,
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
+    protected void pprintVeList(StringBuilder sb, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
             Integer indent) throws AlgebricksException {
         sb.append("[");
         boolean fst = true;
@@ -479,13 +420,11 @@
     }
 
     @Override
-    public String visitExternalDataLookupOperator(
-            ExternalDataLookupOperator op, Integer indent)
+    public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
             throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append(
-                "external-instant-lookup " + op.getVariables() + " <- "
-                        + op.getExpressionRef().getValue());
+                "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
         return buffer.toString();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c753042..359e243 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -79,6 +80,8 @@
 
     public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException;
 
+    public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException;
+
     public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException;
 
     public R visitSubplanOperator(SubplanOperator op, T arg) throws AlgebricksException;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 691006f..3da9e76 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -18,7 +18,6 @@
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -487,7 +486,7 @@
         }
     }
 
-    private boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
+    protected boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) candidate.getValue();
         if (aop.getPhysicalOperator().expensiveThanMaterialization()) {
             return true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
new file mode 100644
index 0000000..6484202
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * replace Subplan operators with nested loop joins where the join condition is true, if the Subplan
+ * does not contain free variables (does not have correlations to the input stream).
+ * 
+ * @author yingyib
+ */
+public class NestedSubplanToJoinRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        if (context.checkIfInDontApplySet(this, opRef.getValue()))
+            return false;
+        context.addToDontApplySet(this, opRef.getValue());
+
+        ILogicalOperator op1 = opRef.getValue();
+        if (op1.getInputs().size() == 0) {
+            return false;
+        }
+
+        boolean rewritten = false;
+        for (int index = 0; index < op1.getInputs().size(); index++) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) op1.getInputs().get(index).getValue();
+            if (child.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                continue;
+            }
+
+            AbstractOperatorWithNestedPlans subplan = (AbstractOperatorWithNestedPlans) child;
+            Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+            OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
+            if (!freeVars.isEmpty()) {
+                /**
+                 * the subplan is correlated with the outer plan, other rules can deal with it
+                 */
+                continue;
+            }
+
+            /** get the input operator of the subplan operator */
+            ILogicalOperator subplanInput = subplan.getInputs().get(0).getValue();
+            AbstractLogicalOperator subplanInputOp = (AbstractLogicalOperator) subplanInput;
+
+            /** If the other join branch is a trivial plan, do not do the rewriting. */
+            if (subplanInputOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                continue;
+            }
+
+            /** get all nested top operators */
+            List<ILogicalPlan> nestedPlans = subplan.getNestedPlans();
+            List<Mutable<ILogicalOperator>> nestedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+            for (ILogicalPlan nestedPlan : nestedPlans) {
+                nestedRoots.addAll(nestedPlan.getRoots());
+            }
+            if (nestedRoots.size() == 0) {
+                /** there is no nested top operators */
+                continue;
+            }
+
+            /** expend the input and roots into a DAG of nested loop joins */
+            Mutable<ILogicalExpression> expr = new MutableObject<ILogicalExpression>(ConstantExpression.TRUE);
+            Mutable<ILogicalOperator> nestedRootRef = nestedRoots.get(0);
+            ILogicalOperator join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput),
+                    nestedRootRef);
+
+            /** rewrite the nested tuple source to be empty tuple source */
+            rewriteNestedTupleSource(nestedRootRef);
+
+            for (int i = 1; i < nestedRoots.size(); i++) {
+                join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
+            }
+            op1.getInputs().get(index).setValue(join);
+            context.computeAndSetTypeEnvironmentForOperator(join);
+            rewritten = true;
+        }
+        return rewritten;
+    }
+
+    /**
+     * rewrite NestedTupleSource operators to EmptyTupleSource operators
+     * 
+     * @param nestedRootRef
+     */
+    private void rewriteNestedTupleSource(Mutable<ILogicalOperator> nestedRootRef) {
+        AbstractLogicalOperator nestedRoot = (AbstractLogicalOperator) nestedRootRef.getValue();
+        if (nestedRoot.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            nestedRootRef.setValue(new EmptyTupleSourceOperator());
+        }
+        List<Mutable<ILogicalOperator>> inputs = nestedRoot.getInputs();
+        for (Mutable<ILogicalOperator> input : inputs) {
+            rewriteNestedTupleSource(input);
+        }
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java
new file mode 100644
index 0000000..16a71a4
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class PushMapOperatorDownThroughProductRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (!op1.isMap()) {
+            return false;
+        }
+        Mutable<ILogicalOperator> op2Ref = op1.getInputs().get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op2Ref.getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return false;
+        }
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op2;
+        if (!OperatorPropertiesUtil.isAlwaysTrueCond(join.getCondition().getValue())) {
+            return false;
+        }
+
+        List<LogicalVariable> used = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(op1, used);
+
+        Mutable<ILogicalOperator> b0Ref = op2.getInputs().get(0);
+        ILogicalOperator b0 = b0Ref.getValue();
+        List<LogicalVariable> b0Scm = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(b0, b0Scm);
+        if (b0Scm.containsAll(used)) {
+            // push operator on left branch
+            op2Ref.setValue(b0);
+            b0Ref.setValue(op1);
+            opRef.setValue(op2);
+            return true;
+        } else {
+            Mutable<ILogicalOperator> b1Ref = op2.getInputs().get(1);
+            ILogicalOperator b1 = b1Ref.getValue();
+            List<LogicalVariable> b1Scm = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(b1, b1Scm);
+            if (b1Scm.containsAll(used)) {
+                // push operator on right branch
+                op2Ref.setValue(b1);
+                b1Ref.setValue(op1);
+                opRef.setValue(op2);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index e1c8463..f6c6ad8 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -33,6 +33,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -76,16 +77,20 @@
         Mutable<ILogicalOperator> tupleSourceOpRef = currentOpRef;
         currentOpRef = opRef;
         if (tupleSourceOpRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            while (currentOpRef.getValue().getInputs().size() == 1
-                    && currentOpRef.getValue() instanceof AbstractScanOperator
-                    && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
-                if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
-                    /** move down the boundary if the operator is independent of the tuple source */
-                    boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
-                } else {
-                    break;
+            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) tupleSourceOpRef.getValue();
+            // If the subplan input is a trivial plan, do not do the rewriting.
+            if (nts.getSourceOperator().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                while (currentOpRef.getValue().getInputs().size() == 1
+                        && currentOpRef.getValue() instanceof AbstractScanOperator
+                        && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
+                    if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
+                        /** move down the boundary if the operator is independent of the tuple source */
+                        boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+                    } else {
+                        break;
+                    }
+                    currentOpRef = currentOpRef.getValue().getInputs().get(0);
                 }
-                currentOpRef = currentOpRef.getValue().getInputs().get(0);
             }
         }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1bfa413..8887b82 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -109,8 +109,8 @@
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
-            throws HyracksDataException {
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -181,7 +181,8 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) throws HyracksDataException {
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+                throws HyracksDataException {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -194,7 +195,8 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+                    .createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -384,7 +386,8 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
-            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+                    .createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 744f939..5cab373 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -115,24 +115,23 @@
     private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
     private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
     private final IPredicateEvaluatorFactory predEvaluatorFactory;
-    
+
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
-    
-    	//Flags added for test purpose
+
+    //Flags added for test purpose
     private static boolean skipInMemoryHJ = false;
     private static boolean forceNLJ = false;
     private static boolean forceRR = false;
-    
-    
+
     private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory0,
-            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory, boolean isLeftOuter,
-            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory,
+            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
 
         super(spec, 2, 1);
         this.memsize = memsize;
@@ -153,7 +152,8 @@
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
+            ITuplePairComparatorFactory tupPaircomparatorFactory0,
+            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
             throws HyracksDataException {
 
         super(spec, 2, 1);
@@ -214,7 +214,7 @@
     }
 
     public static class BuildAndPartitionTaskState extends AbstractStateObject {
-    	
+
         private int memForJoin;
         private int numOfPartitions;
         private OptimizedHybridHashJoin hybridHJ;
@@ -266,10 +266,10 @@
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            
-            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-            
-            
+
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+                    .createPredicateEvaluator());
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
@@ -287,17 +287,16 @@
                     state.memForJoin = memsize - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
-                    if(!isLeftOuter){
-                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                    if (!isLeftOuter) {
+                        state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
                                 buildHpc, predEvaluator);
-                    }
-                    else{
-                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                    } else {
+                        state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
                                 buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
                     }
-                    
+
                     state.hybridHJ.initBuild();
                 }
 
@@ -331,7 +330,7 @@
      * Hybrid Hash Join recursively on them.
      */
     private class ProbeAndJoinActivityNode extends AbstractActivityNode {
-    	
+
         private static final long serialVersionUID = 1L;
 
         private final ActivityId buildAid;
@@ -351,8 +350,9 @@
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-            
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+                    .createPredicateEvaluator());
+
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
@@ -388,7 +388,7 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                	state.hybridHJ.probe(buffer, writer);
+                    state.hybridHJ.probe(buffer, writer);
                 }
 
                 @Override
@@ -425,17 +425,22 @@
                 }
 
                 private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
-                        RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
+                        RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
+                        throws HyracksDataException {
                     ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
-                    
-                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
-                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
-                    
-                    LOGGER.fine("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+"  - LeftOuter is "+isLeftOuter);
-                                      
+
+                    long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj
+                            .getBuildPartitionSize(pid) / ctx.getFrameSize());
+                    long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
+                            .getProbePartitionSize(pid) / ctx.getFrameSize());
+
+                    LOGGER.fine("\n>>>Joining Partition Pairs (pid " + pid + ") - (level " + level + ") - wasReversed "
+                            + wasReversed + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
+                            + " - MemForJoin " + (state.memForJoin) + "  - LeftOuter is " + isLeftOuter);
+
                     //Apply in-Mem HJ if possible
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
                             || (probePartSize < state.memForJoin && !isLeftOuter)) {
@@ -468,14 +473,16 @@
                     }
                     //Apply (Recursive) HHJ
                     else {
-                    	LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
+                        LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         OptimizedHybridHashJoin rHHj;
                         if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
-                        	LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+                            LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                    + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
                                     nPartitions);
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
-                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
+                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
+                                    predEvaluator); //checked-confirmed
 
                             buildSideReader.open();
                             rHHj.initBuild();
@@ -500,8 +507,9 @@
                                     : maxAfterProbeSize;
 
                             BitSet rPStatus = rHHj.getPartitinStatus();
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {	//Case 2.1.1 - Keep applying HHJ
-                            	LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+                                LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                        + level + "]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -514,11 +522,12 @@
                                 }
 
                             } else { //Case 2.1.2 - Switch to NLJ
-                            	LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
-                            	for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+                                        + level + "]");
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-                                    
+
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
@@ -526,22 +535,23 @@
                                     int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
                                     int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                                     if (isLeftOuter || buildSideInTups < probeSideInTups) {
-                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rprfw, rbrfw,
-                                                nljComparator0, false); //checked-modified
+                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rprfw, rbrfw, nljComparator0,
+                                                false); //checked-modified
                                     } else {
-                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rbrfw, rprfw,
-                                                nljComparator1, true); //checked-modified
+                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rbrfw, rprfw, nljComparator1,
+                                                true); //checked-modified
                                     }
                                 }
                             }
                         } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
-                        	LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
+                            LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
                             int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
                                     nPartitions);
-                            
+
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
-                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);	//checked-confirmed
-                            rHHj.setIsReversed(true);	//Added to use predicateEvaluator (for inMemoryHashJoin) correctly
+                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc,
+                                    predEvaluator); //checked-confirmed
+                            rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
 
                             probeSideReader.open();
                             rHHj.initBuild();
@@ -563,8 +573,9 @@
                                     : maxAfterProbeSize;
                             BitSet rPStatus = rHHj.getPartitinStatus();
 
-                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {	//Case 2.2.1 - Keep applying HHJ
-                            	LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
+                            if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+                                LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
+                                        + level + "]");
                                 for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -573,14 +584,15 @@
                                         continue;
                                     }
 
-                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true);	//checked-confirmed
+                                    joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
                                 }
                             } else { //Case 2.2.2 - Switch to NLJ
-                            	LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
-                            	for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+                                LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
+                                        + level + "]");
+                                for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                     RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                                     RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-                                    
+
                                     if (rbrfw == null || rprfw == null) {
                                         continue;
                                     }
@@ -588,11 +600,11 @@
                                     long buildSideSize = rbrfw.getFileSize();
                                     long probeSideSize = rprfw.getFileSize();
                                     if (buildSideSize > probeSideSize) {
-                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rbrfw, rprfw,
-                                                nljComparator0, true);	//checked-modified
+                                        applyNestedLoopJoin(buildRd, probeRd, memsize, rbrfw, rprfw, nljComparator0,
+                                                true); //checked-modified
                                     } else {
-                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rprfw, rbrfw,
-                                                nljComparator1, true);	//checked-modified
+                                        applyNestedLoopJoin(probeRd, buildRd, memsize, rprfw, rbrfw, nljComparator1,
+                                                true); //checked-modified
                                     }
                                 }
                             }
@@ -604,8 +616,8 @@
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
-                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
-                        throws HyracksDataException {
+                        ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader,
+                        boolean reverse, int pid) throws HyracksDataException {
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -623,7 +635,7 @@
                     }
                     bReader.close();
                     rPartbuff.clear();
-                    	// probe
+                    // probe
                     pReader.open();
                     while (pReader.nextFrame(rPartbuff)) {
                         joiner.join(rPartbuff, writer);
@@ -634,13 +646,13 @@
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
-                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
-                        throws HyracksDataException {
-                	NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
-                	nlj.setIsReversed(reverse);
-                	
-                	
+                        RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
+                        boolean reverse) throws HyracksDataException {
+                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize,
+                            predEvaluator, isLeftOuter, nullWriters1);
+                    nlj.setIsReversed(reverse);
+
                     ByteBuffer cacheBuff = ctx.allocateFrame();
                     innerReader.open();
                     while (innerReader.nextFrame(cacheBuff)) {
@@ -667,16 +679,16 @@
             return op;
         }
     }
-    
-    public void setSkipInMemHJ(boolean b){
-    	skipInMemoryHJ = b;
+
+    public void setSkipInMemHJ(boolean b) {
+        skipInMemoryHJ = b;
     }
-    
-    public void setForceNLJ(boolean b){
-    	forceNLJ = b;
+
+    public void setForceNLJ(boolean b) {
+        forceNLJ = b;
     }
-    
-    public void setForceRR(boolean b){
-    	forceRR = (!isLeftOuter && b);
+
+    public void setForceRR(boolean b) {
+        forceRR = (!isLeftOuter && b);
     }
 }
\ No newline at end of file