1. Tracks the variables coming from the outer branch of loj to later decide whether they are still nullable
2. Deals with variables in nested subplans in OperatorPropertiesUtil

Change-Id: I6438fad75ee308e3f6c2b276f0d6b7c882e5b379
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/121
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/INullableTypeComputer.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/INullableTypeComputer.java
index 877d363..d3f6d19 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/INullableTypeComputer.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/INullableTypeComputer.java
@@ -18,4 +18,8 @@
 
 public interface INullableTypeComputer {
     public Object makeNullableType(Object type) throws AlgebricksException;
+
+    public boolean canBeNull(Object type);
+
+    public Object getNonOptionalType(Object type);
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IVariableTypeEnvironment.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IVariableTypeEnvironment.java
index 3ba438a..8bcc575 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IVariableTypeEnvironment.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/IVariableTypeEnvironment.java
@@ -23,7 +23,8 @@
 public interface IVariableTypeEnvironment {
     public Object getVarType(LogicalVariable var) throws AlgebricksException;
 
-    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables) throws AlgebricksException;
+    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException;
 
     public void setVarType(LogicalVariable var, Object type);
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index c6e93fe..528ae4e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -173,8 +173,7 @@
         return createPropagatingAllInputsTypeEnvironment(ctx);
     }
 
-    protected IVariableTypeEnvironment createPropagatingAllInputsTypeEnvironment(ITypingContext ctx) {
-        //        return createPropagatingAllInputsTypeEnvironment(ctx);
+    protected PropagatingTypeEnvironment createPropagatingAllInputsTypeEnvironment(ITypingContext ctx) {
         int n = inputs.size();
         ITypeEnvPointer[] envPointers = new ITypeEnvPointer[n];
         for (int i = 0; i < n; i++) {
@@ -188,4 +187,4 @@
     public boolean requiresVariableReferenceExpressions() {
         return true;
     }
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index 0af3dbc..0d40307 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -20,11 +20,14 @@
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 /**
@@ -78,13 +81,22 @@
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+        PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
         int n = variables.size();
         for (int i = 0; i < n; i++) {
             env.setVarType(
                     variables.get(i),
                     ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(), ctx.getMetadataProvider(),
                             env));
+            if (expressions.get(i).getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                LogicalVariable var = ((VariableReferenceExpression) expressions.get(i).getValue())
+                        .getVariableReference();
+                for (List<LogicalVariable> list : env.getCorrelatedNullableVariableLists()) {
+                    if (list.contains(var)) {
+                        list.add(variables.get(i));
+                    }
+                }
+            }
         }
         return env;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
index 5872e6d..7d16ed1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
@@ -80,8 +80,8 @@
             }
 
             @Override
-            public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables)
-                    throws AlgebricksException {
+            public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
+                    List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
                 return null;
             }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
index 8c37099..1504039 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
@@ -14,13 +14,18 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -56,8 +61,12 @@
         for (int i = 0; i < n; i++) {
             envPointers[i] = new OpRefTypeEnvPointer(inputs.get(i), ctx);
         }
-        return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
-                ctx.getMetadataProvider(), TypePropagationPolicy.LEFT_OUTER, envPointers);
+        PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+                ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.LEFT_OUTER, envPointers);
+        List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(inputs.get(1).getValue(), liveVars); // live variables from outer branch can be null together
+        env.getCorrelatedNullableVariableLists().add(liveVars);
+        return env;
     }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
index 16fed48..e1824d6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
@@ -27,14 +27,26 @@
 
         @Override
         public Object getVarType(LogicalVariable var, INullableTypeComputer ntc,
-                List<LogicalVariable> nonNullVariableList, ITypeEnvPointer... typeEnvs) throws AlgebricksException {
+                List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists,
+                ITypeEnvPointer... typeEnvs) throws AlgebricksException {
             for (ITypeEnvPointer p : typeEnvs) {
                 IVariableTypeEnvironment env = p.getTypeEnv();
                 if (env == null) {
                     throw new AlgebricksException("Null environment for pointer " + p + " in getVarType for var=" + var);
                 }
-                Object t = env.getVarType(var, nonNullVariableList);
+                Object t = env.getVarType(var, nonNullVariableList, correlatedNullableVariableLists);
                 if (t != null) {
+                    if (ntc != null && ntc.canBeNull(t)) {
+                        for (List<LogicalVariable> list : correlatedNullableVariableLists) {
+                            if (list.contains(var)) {
+                                for (LogicalVariable v : list) {
+                                    if (nonNullVariableList.contains(v)) {
+                                        return ntc.getNonOptionalType(t);
+                                    }
+                                }
+                            }
+                        }
+                    }
                     return t;
                 }
             }
@@ -46,10 +58,12 @@
 
         @Override
         public Object getVarType(LogicalVariable var, INullableTypeComputer ntc,
-                List<LogicalVariable> nonNullVariableList, ITypeEnvPointer... typeEnvs) throws AlgebricksException {
+                List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists,
+                ITypeEnvPointer... typeEnvs) throws AlgebricksException {
             int n = typeEnvs.length;
             for (int i = 0; i < n; i++) {
-                Object t = typeEnvs[i].getTypeEnv().getVarType(var, nonNullVariableList);
+                Object t = typeEnvs[i].getTypeEnv().getVarType(var, nonNullVariableList,
+                        correlatedNullableVariableLists);
                 if (t != null) {
                     if (i == 0) { // inner branch
                         return t;
@@ -78,5 +92,5 @@
     };
 
     public abstract Object getVarType(LogicalVariable var, INullableTypeComputer ntc,
-            List<LogicalVariable> nonNullVariableList, ITypeEnvPointer... typeEnvs) throws AlgebricksException;
+            List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists, ITypeEnvPointer... typeEnvs) throws AlgebricksException;
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/NonPropagatingTypeEnvironment.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/NonPropagatingTypeEnvironment.java
index 7e744dd..eb0eb71 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/NonPropagatingTypeEnvironment.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/NonPropagatingTypeEnvironment.java
@@ -34,7 +34,8 @@
     }
 
     @Override
-    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables) throws AlgebricksException {
+    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
         return getVarType(var);
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagateOperatorInputsTypeEnvironment.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagateOperatorInputsTypeEnvironment.java
index 71bc891..87ee385 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagateOperatorInputsTypeEnvironment.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagateOperatorInputsTypeEnvironment.java
@@ -29,6 +29,7 @@
 public class PropagateOperatorInputsTypeEnvironment extends AbstractTypeEnvironment {
 
     private final List<LogicalVariable> nonNullVariables = new ArrayList<LogicalVariable>();
+    private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<List<LogicalVariable>>();
     private final ILogicalOperator op;
     private final ITypingContext ctx;
 
@@ -44,13 +45,14 @@
     }
 
     @Override
-    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariableList) throws AlgebricksException {
+    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
         nonNullVariableList.addAll(nonNullVariables);
-        return getVarTypeFullList(var, nonNullVariableList);
+        return getVarTypeFullList(var, nonNullVariableList, correlatedNullableVariableLists);
     }
 
-    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonNullVariableList)
-            throws AlgebricksException {
+    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
         Object t = varTypeMap.get(var);
         if (t != null) {
             return t;
@@ -58,7 +60,7 @@
         for (Mutable<ILogicalOperator> r : op.getInputs()) {
             ILogicalOperator c = r.getValue();
             IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(c);
-            Object t2 = env.getVarType(var, nonNullVariableList);
+            Object t2 = env.getVarType(var, nonNullVariableList, correlatedNullableVariableLists);
             if (t2 != null) {
                 return t2;
             }
@@ -68,7 +70,7 @@
 
     @Override
     public Object getVarType(LogicalVariable var) throws AlgebricksException {
-        return getVarTypeFullList(var, nonNullVariables);
+        return getVarTypeFullList(var, nonNullVariables, correlatedNullableVariableLists);
     }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
index 4294571..d167cd7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
@@ -34,6 +34,8 @@
 
     private final List<LogicalVariable> nonNullVariables = new ArrayList<LogicalVariable>();
 
+    private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<List<LogicalVariable>>();
+
     public PropagatingTypeEnvironment(IExpressionTypeComputer expressionTypeComputer,
             INullableTypeComputer nullableTypeComputer, IMetadataProvider<?, ?> metadataProvider,
             TypePropagationPolicy policy, ITypeEnvPointer[] envPointers) {
@@ -45,25 +47,41 @@
 
     @Override
     public Object getVarType(LogicalVariable var) throws AlgebricksException {
-        return getVarTypeFullList(var, nonNullVariables);
+        return getVarTypeFullList(var, nonNullVariables, correlatedNullableVariableLists);
     }
 
     public List<LogicalVariable> getNonNullVariables() {
         return nonNullVariables;
     }
 
-    @Override
-    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariableList) throws AlgebricksException {
-        nonNullVariableList.addAll(nonNullVariables);
-        return getVarTypeFullList(var, nonNullVariableList);
+    public List<List<LogicalVariable>> getCorrelatedNullableVariableLists() {
+        return correlatedNullableVariableLists;
     }
 
-    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonNullVariableList)
-            throws AlgebricksException {
+    @Override
+    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
+        for (LogicalVariable v : nonNullVariables) {
+            if (!nonNullVariableList.contains(v)) {
+                nonNullVariableList.add(v);
+            }
+        }
+        Object t = getVarTypeFullList(var, nonNullVariableList, correlatedNullableVariableLists);
+        for (List<LogicalVariable> list : this.correlatedNullableVariableLists) {
+            if (!correlatedNullableVariableLists.contains(list)) {
+                correlatedNullableVariableLists.add(list);
+            }
+        }
+        return t;
+    }
+
+    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
+            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
         Object t = varTypeMap.get(var);
         if (t != null) {
             return t;
         }
-        return policy.getVarType(var, nullableTypeComputer, nonNullVariableList, envPointers);
+        return policy.getVarType(var, nullableTypeComputer, nonNullVariableList, correlatedNullableVariableLists,
+                envPointers);
     }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 6648252..ec9b403 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -137,6 +137,16 @@
             return true;
         }
         boolean onPath = false;
+        if (((AbstractLogicalOperator) op).hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : a.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                    if (isDestInNestedPath((AbstractLogicalOperator) r.getValue(), dest)) {
+                        onPath = true;
+                    }
+                }
+            }
+        }
         for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
             if (collectUsedAndProducedVariablesInPath(childRef.getValue(), dest, usedVars, producedVars)) {
                 onPath = true;
@@ -149,6 +159,35 @@
         return onPath;
     }
 
+    /***
+     * Recursively checks if the dest operator is in the path of a nested plan
+     * 
+     * @param op
+     * @param dest
+     * @return
+     */
+    private static boolean isDestInNestedPath(AbstractLogicalOperator op, ILogicalOperator dest) {
+        if (op == dest) {
+            return true;
+        }
+        for (Mutable<ILogicalOperator> i : op.getInputs()) {
+            if (isDestInNestedPath((AbstractLogicalOperator) i.getValue(), dest)) {
+                return true;
+            }
+        }
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : a.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                    if (isDestInNestedPath((AbstractLogicalOperator) r.getValue(), dest)) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
     public static void getFreeVariablesInSubplans(AbstractOperatorWithNestedPlans op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
         for (ILogicalPlan p : op.getNestedPlans()) {