ASTERIXDB-1168: fix queries with duplicates in a subplan's input.

Fixed PushSelectDown to consider stateful functions;
Added a primary key visitor to generate/progate primary key information.

Change-Id: I83907c29699a76540abd1a246776f55576eeced8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/695
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 24578e6..6badac7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -56,7 +56,7 @@
 
     public abstract void addPrimaryKey(FunctionalDependency pk);
 
-    public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar);
+    public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var);
 
     public abstract void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
index eeaf6c3..fe91ed3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -66,4 +65,8 @@
     public <R, T> R accept(ILogicalExpressionVisitor<R, T> visitor, T arg) throws AlgebricksException {
         return visitor.visitStatefulFunctionCallExpression(this, arg);
     }
+
+    public IPropertiesComputer getPropertiesComputer() {
+        return propertiesComputer;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsExpressionStatefulVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsExpressionStatefulVisitor.java
new file mode 100644
index 0000000..1a7505b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsExpressionStatefulVisitor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+/**
+ * This visitor determines whether a logical expression contains any stateful
+ * function call expression.
+ */
+public class IsExpressionStatefulVisitor implements ILogicalExpressionVisitor<Boolean, Void> {
+
+    @Override
+    public Boolean visitConstantExpression(ConstantExpression expr, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        return visitFunctionExpression(expr, arg);
+    }
+
+    @Override
+    public Boolean visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        return visitFunctionExpression(expr, arg);
+    }
+
+    @Override
+    public Boolean visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        return true;
+    }
+
+    @Override
+    public Boolean visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        return visitFunctionExpression(expr, arg);
+    }
+
+    private boolean visitFunctionExpression(AbstractFunctionCallExpression expr, Void arg) throws AlgebricksException {
+        for (Mutable<ILogicalExpression> argRef : expr.getArguments()) {
+            if (argRef.getValue().accept(this, arg)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index 5eca2bd..36111ff 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -123,7 +123,11 @@
     @Override
     public ILogicalExpression visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr, Void arg)
             throws AlgebricksException {
-        throw new UnsupportedOperationException();
+        StatefulFunctionCallExpression exprCopy = new StatefulFunctionCallExpression(expr.getFunctionInfo(),
+                expr.getPropertiesComputer(), deepCopyExpressionReferenceList(expr.getArguments()));
+        deepCopyAnnotations(expr, exprCopy);
+        deepCopyOpaqueParameters(expr, exprCopy);
+        return exprCopy;
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
new file mode 100644
index 0000000..3f0772a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * This visitor propagates primary key information for each operator.
+ * NOTE: since the primary key information in data-source-scan is determined by the specific
+ * data source, this visitor, at the Algebricks level, could not generate that information.
+ */
+public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void, IOptimizationContext> {
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ctx.addPrimaryKey(new FunctionalDependency(op.getVariables(), op.getVariables()));
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        List<LogicalVariable> header = new ArrayList<>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gbyTerm : op.getGroupByList()) {
+            header.add(gbyTerm.first);
+        }
+        List<LogicalVariable> liveVars = new ArrayList<>();
+        VariableUtilities.getSubplanLocalLiveVariables(op, liveVars);
+        ctx.addPrimaryKey(new FunctionalDependency(header, liveVars));
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        // Obtain used variables on the right-hand side of an assign.
+        Set<LogicalVariable> usedVars = new HashSet<>();
+        VariableUtilities.getUsedVariables(op, usedVars);
+        Set<LogicalVariable> primaryKeyVars = null;
+        for (LogicalVariable usedVar : usedVars) {
+            List<LogicalVariable> keyVars = ctx.findPrimaryKey(usedVar);
+            if (keyVars == null) {
+                // No key variables can uniquely identify usedVar.
+                return null;
+            }
+            if (primaryKeyVars == null) {
+                primaryKeyVars = new HashSet<>(keyVars);
+            } else {
+                // The primary key is the union of all the key header variables.
+                primaryKeyVars.addAll(keyVars);
+            }
+        }
+        if (primaryKeyVars != null && !primaryKeyVars.isEmpty()) {
+            List<LogicalVariable> producedVars = new ArrayList<>();
+            VariableUtilities.getProducedVariables(op, producedVars);
+            // Generates new primary keys.
+            ctx.addPrimaryKey(new FunctionalDependency(new ArrayList<LogicalVariable>(primaryKeyVars), producedVars));
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitOuterUnnestOperator(OuterUnnestOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, IOptimizationContext arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index dfc12bf..0a434ad 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -37,8 +37,10 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.CardinalityInferenceVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsExpressionStatefulVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 
 public class OperatorPropertiesUtil {
@@ -265,4 +267,27 @@
         CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
         return operator.accept(visitor, null) == 1L;
     }
+
+    /**
+     * Whether the operator is an assign operator that calls a stateful function.
+     *
+     * @param op
+     *            the operator to consider.
+     * @return true if the operator is an assign operator and it calls a stateful function.
+     * @throws AlgebricksException
+     */
+    public static boolean isStatefulAssign(ILogicalOperator op) throws AlgebricksException {
+        if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+            return false;
+        }
+        AssignOperator assignOp = (AssignOperator) op;
+        IsExpressionStatefulVisitor visitor = new IsExpressionStatefulVisitor();
+        for (Mutable<ILogicalExpression> exprRef : assignOp.getExpressions()) {
+            ILogicalExpression expr = exprRef.getValue();
+            if (expr.accept(visitor, null)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 0d601b0..e76b486 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -66,7 +66,7 @@
 
     private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
     private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
-    private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
 
     private IMetadataProvider metadataProvider;
     private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
@@ -189,18 +189,18 @@
 
     @Override
     public void addPrimaryKey(FunctionalDependency pk) {
-        assert (pk.getTail().size() == 1);
-        LogicalVariable recordVar = pk.getTail().get(0);
-        recordToPrimaryKey.put(recordVar, pk);
+        for (LogicalVariable var : pk.getTail()) {
+            varToPrimaryKey.put(var, pk);
+        }
     }
 
     @Override
     public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
-        FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
+        FunctionalDependency fd = varToPrimaryKey.get(recordVar);
         if (fd == null) {
             return null;
         }
-        return fd.getHead();
+        return new ArrayList<LogicalVariable>(fd.getHead());
     }
 
     @Override
@@ -291,7 +291,7 @@
 
     @Override
     public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
-        for (Map.Entry<LogicalVariable, FunctionalDependency> me : recordToPrimaryKey.entrySet()) {
+        for (Map.Entry<LogicalVariable, FunctionalDependency> me : varToPrimaryKey.entrySet()) {
             FunctionalDependency fd = me.getValue();
             List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
             for (LogicalVariable v : fd.getHead()) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
index 0a4672b..2aee1a7 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
@@ -20,7 +20,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -32,15 +31,14 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
  * Matches the following operator pattern:
  * (select) <-- ((assign)* <-- (select)*)+
- *
  * Consolidates the selects to:
  * (select) <-- (assign)*
- *
  */
 public class ConsolidateSelectsRule implements IAlgebraicRewriteRule {
 
@@ -50,8 +48,9 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-    	AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
             return false;
         }
@@ -63,24 +62,25 @@
         AbstractLogicalOperator topMostOp = null;
         AbstractLogicalOperator selectParent = null;
         AbstractLogicalOperator nextSelect = firstSelect;
-		do {
-        	// Skip through assigns.
+        do {
+            // Skip through assigns.
             do {
-            	selectParent = nextSelect;
-            	nextSelect = (AbstractLogicalOperator) selectParent.getInputs().get(0).getValue();
-            } while (nextSelect.getOperatorTag() == LogicalOperatorTag.ASSIGN);
+                selectParent = nextSelect;
+                nextSelect = (AbstractLogicalOperator) selectParent.getInputs().get(0).getValue();
+            } while (nextSelect.getOperatorTag() == LogicalOperatorTag.ASSIGN && !OperatorPropertiesUtil
+                    .isStatefulAssign(nextSelect) /* Select cannot be pushed through stateful assigns.*/);
             // Stop if the child op is not a select.
             if (nextSelect.getOperatorTag() != LogicalOperatorTag.SELECT) {
-        		break;
-        	}
+                break;
+            }
             // Remember the top-most op that we are not removing.
             topMostOp = selectParent;
 
             // Initialize the new conjuncts, if necessary.
             if (conj == null) {
-            	conj = new ScalarFunctionCallExpression(andFn);
-            	// Add the first select's condition.
-            	conj.getArguments().add(new MutableObject<ILogicalExpression>(firstSelect.getCondition().getValue()));
+                conj = new ScalarFunctionCallExpression(andFn);
+                // Add the first select's condition.
+                conj.getArguments().add(new MutableObject<ILogicalExpression>(firstSelect.getCondition().getValue()));
             }
 
             // Consolidate all following selects.
@@ -93,16 +93,16 @@
 
             // Hook up the input of the top-most remaining op if necessary.
             if (topMostOp.getOperatorTag() == LogicalOperatorTag.ASSIGN || topMostOp == firstSelect) {
-            	topMostOp.getInputs().set(0, selectParent.getInputs().get(0));
+                topMostOp.getInputs().set(0, selectParent.getInputs().get(0));
             }
 
             // Prepare for next iteration.
             nextSelect = selectParent;
         } while (true);
 
-		// Did we consolidate any selects?
+        // Did we consolidate any selects?
         if (conj == null) {
-        	return false;
+            return false;
         }
 
         // Set the new conjuncts.
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
index 71c4776..3d3331c 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectDownRule.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -42,7 +41,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.SELECT) {
             return false;
@@ -72,7 +72,8 @@
     private static boolean propagateSelectionRec(Mutable<ILogicalOperator> sigmaRef, Mutable<ILogicalOperator> opRef2)
             throws AlgebricksException {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-        if (op2.getInputs().size() != 1 || op2.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+        if (op2.getInputs().size() != 1 || op2.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN
+                || OperatorPropertiesUtil.isStatefulAssign(op2)) {
             return false;
         }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 21e9c06..1ba1ea3 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class SimpleUnnestToProductRule implements IAlgebraicRewriteRule {
@@ -109,7 +110,8 @@
                     && boundaryOpRef.getValue().getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
                 List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
                 VariableUtilities.getUsedVariables(boundaryOpRef.getValue(), opUsedVars);
-                if (opUsedVars.size() == 0) {
+                if (opUsedVars.size() == 0 && !OperatorPropertiesUtil.isStatefulAssign(boundaryOpRef.getValue())
+                /* We cannot freely move the location of stateful assigns. */) {
                     // move down the boundary if the operator is a const assigns.
                     boundaryOpRef = boundaryOpRef.getValue().getInputs().get(0);
                 } else {
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
index 7be0b16..99480bf 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
@@ -22,7 +22,6 @@
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -32,24 +31,31 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
-import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class PhysicalOptimizationsUtil {
 
-    public static void computeFDsAndEquivalenceClasses(AbstractLogicalOperator op, IOptimizationContext ctx)
+    public static void computeFDsAndEquivalenceClasses(ILogicalOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         FDsAndEquivClassesVisitor visitor = new FDsAndEquivClassesVisitor();
+        visitOperatorAndItsDescendants(op, visitor, ctx);
+    }
+
+    public static <R> void visitOperatorAndItsDescendants(ILogicalOperator op, ILogicalOperatorVisitor<R, IOptimizationContext> visitor,
+            IOptimizationContext ctx) throws AlgebricksException {
         Set<ILogicalOperator> visitSet = new HashSet<ILogicalOperator>();
         computeFDsAndEqClassesWithVisitorRec(op, ctx, visitor, visitSet);
     }
 
-    private static void computeFDsAndEqClassesWithVisitorRec(AbstractLogicalOperator op, IOptimizationContext ctx,
-            FDsAndEquivClassesVisitor visitor, Set<ILogicalOperator> visitSet) throws AlgebricksException {
+    private static <R> void computeFDsAndEqClassesWithVisitorRec(ILogicalOperator op, IOptimizationContext ctx,
+            ILogicalOperatorVisitor<R, IOptimizationContext> visitor, Set<ILogicalOperator> visitSet)
+                    throws AlgebricksException {
         visitSet.add(op);
         for (Mutable<ILogicalOperator> i : op.getInputs()) {
             computeFDsAndEqClassesWithVisitorRec((AbstractLogicalOperator) i.getValue(), ctx, visitor, visitSet);
         }
-        if (op.hasNestedPlans()) {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
+        if (aop.hasNestedPlans()) {
             for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
                     AbstractLogicalOperator rootOp = (AbstractLogicalOperator) r.getValue();
@@ -65,11 +71,6 @@
             }
         }
         op.accept(visitor, ctx);
-        if (AlgebricksConfig.DEBUG) {
-            AlgebricksConfig.ALGEBRICKS_LOGGER.fine("--> op. type = " + op.getOperatorTag() + "\n"
-                    + "    equiv. classes = " + ctx.getEquivalenceClassMap(op) + "\n" + "    FDs = "
-                    + ctx.getFDList(op) + "\n");
-        }
     }
 
 }