add top-down record type propagation: from the inserting target data source to a record-constructor function call expression

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@66 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index d626ef3..aa22be5 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -3,7 +3,6 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.*; 
 import edu.uci.ics.asterix.optimizer.rules.AsterixInlineVariablesRule;
 import edu.uci.ics.asterix.optimizer.rules.ByNameToByIndexFieldAccessRule;
 import edu.uci.ics.asterix.optimizer.rules.ConstantFoldingRule;
@@ -28,9 +27,43 @@
 import edu.uci.ics.asterix.optimizer.rules.RemoveRedundantListifyRule;
 import edu.uci.ics.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import edu.uci.ics.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
+import edu.uci.ics.asterix.optimizer.rules.TopDownTypeInferenceRule;
 import edu.uci.ics.asterix.optimizer.rules.UnnestToDataScanRule;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.BreakSelectIntoConjunctsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ComplexJoinInferenceRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ConsolidateAssignsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ConsolidateSelectsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.EliminateSubplanRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.EnforceStructuralPropertiesRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ExtractCommonOperatorsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ExtractGbyExpressionsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.InferTypesRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertOuterJoinRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.InsertProjectBeforeUnionRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroHashPartitionMergeExchange;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroJoinInsideSubplanRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceCombinerRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForStandaloneAggregRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByForSubplanRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushAssignDownThroughProductRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushDieUpRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushNestedOrderByUnderPreSortedGroupByRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectIntoJoinRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSubplanWithAggregateDownThroughProductRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.ReinferAllTypesRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.SubplanOutOfGroupRule;
 
 public final class RuleCollections {
 
@@ -48,6 +81,7 @@
         normalization.add(new ExtractGbyExpressionsRule());
         normalization.add(new ExtractDistinctByExpressionsRule());
         normalization.add(new ExtractOrderExpressionsRule());
+        normalization.add(new TopDownTypeInferenceRule());
         normalization.add(new ConstantFoldingRule());
         normalization.add(new UnnestToDataScanRule());
         normalization.add(new IfElseToSwitchCaseFunctionRule());
@@ -163,4 +197,4 @@
         return prepareForJobGenRewrites;
     }
 
-}
\ No newline at end of file
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
index cf4747b..c0f9d82 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -100,7 +100,8 @@
     }
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         ILogicalOperator op = opRef.getValue();
         if (context.checkIfInDontApplySet(this, op)) {
             return false;
@@ -138,7 +139,6 @@
             return new Pair<Boolean, ILogicalExpression>(false, expr);
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public Pair<Boolean, ILogicalExpression> visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
                 Void arg) throws AlgebricksException {
@@ -146,6 +146,10 @@
             if (!checkArgs(expr)) {
                 return new Pair<Boolean, ILogicalExpression>(changed, expr);
             }
+            // TODO: currently ARecord is always a closed record
+            if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
+                return new Pair<Boolean, ILogicalExpression>(false, null);
+            }
             if (expr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME)) {
                 ARecordType rt = (ARecordType) _emptyTypeEnv.getType(expr.getArguments().get(0).getValue());
                 String str = ((AString) ((AsterixConstantValue) ((ConstantExpression) expr.getArguments().get(1)
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
new file mode 100644
index 0000000..a9d1055
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/TopDownTypeInferenceRule.java
@@ -0,0 +1,117 @@
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class TopDownTypeInferenceRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        /**
+         * pattern match: sink/insert/assign
+         * record type is propagated from insert data source to the record-constructor expression
+         */
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.SINK)
+            return false;
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+            return false;
+        AbstractLogicalOperator op3 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
+        if (op3.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+            return false;
+
+        /**
+         * get required record type
+         */
+        InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) op2;
+        AssignOperator oldAssignOperator = (AssignOperator) op3;
+        AqlDataSource dataSource = (AqlDataSource) insertDeleteOperator.getDataSource();
+        IAType[] schemaTypes = (IAType[]) dataSource.getSchemaTypes();
+        ARecordType requiredRecordType = (ARecordType) schemaTypes[schemaTypes.length - 1];
+
+        /**
+         * get input record type to the insert operator
+         */
+        List<LogicalVariable> usedVariables = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(oldAssignOperator, usedVariables);
+        if (usedVariables.size() == 0)
+            return false;
+        LogicalVariable oldRecordVariable = usedVariables.get(0);
+        LogicalVariable inputRecordVar = usedVariables.get(0);
+        IVariableTypeEnvironment env = oldAssignOperator.computeOutputTypeEnvironment(context);
+        ARecordType inputRecordType = (ARecordType) env.getVarType(inputRecordVar);
+
+        AbstractLogicalOperator currentOperator = oldAssignOperator;
+        List<LogicalVariable> producedVariables = new ArrayList<LogicalVariable>();
+        boolean changed = false;
+        if (!requiredRecordType.equals(inputRecordType)) {
+            /**
+             * find the assign operator for the "input record" to the
+             * insert_delete operator
+             */
+            do {
+                if (currentOperator.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                    producedVariables.clear();
+                    VariableUtilities.getProducedVariables(currentOperator, producedVariables);
+                    int position = producedVariables.indexOf(oldRecordVariable);
+
+                    /**
+                     * set the top-down propagated type
+                     */
+                    if (position >= 0) {
+                        AssignOperator originalAssign = (AssignOperator) currentOperator;
+                        List<Mutable<ILogicalExpression>> expressionPointers = originalAssign.getExpressions();
+                        ILogicalExpression expr = expressionPointers.get(position).getValue();
+                        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                            ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expr;
+                            changed = TypeComputerUtilities.setRequiredType(funcExpr, requiredRecordType);
+                            List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+                            int openPartStart = requiredRecordType.getFieldTypes().length * 2;
+                            for (int j = openPartStart; j < args.size(); j++) {
+                                ILogicalExpression arg = args.get(j).getValue();
+                                if (arg.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                                    AbstractFunctionCallExpression argFunc = (AbstractFunctionCallExpression) arg;
+                                    TypeComputerUtilities.setOpenType(argFunc, true);
+                                }
+                            }
+                        }
+                    }
+                }
+                if (currentOperator.getInputs().size() > 0)
+                    currentOperator = (AbstractLogicalOperator) currentOperator.getInputs().get(0).getValue();
+                else
+                    break;
+            } while (currentOperator != null);
+        }
+        return changed;
+    }
+}
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
new file mode 100644
index 0000000..d72372f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-insert.aql
@@ -0,0 +1,21 @@
+create dataverse testdv2;
+use dataverse testdv2;
+
+
+create type testtype as open {
+  id: string,
+  name: string
+}
+
+create nodegroup grouptest on nc1;
+
+create dataset testds(testtype) partitioned by key id on grouptest;
+ 
+ insert into dataset testds (
+ { "id": "001", "name": "Person Three", "hobbies": {{"scuba", "music"}}}
+ );
+
+write output to nc1:"rttest/dml_opentype-insert.adm";
+
+for $d in dataset("testds") 
+return $d
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-insert.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-insert.adm
new file mode 100644
index 0000000..6d6dc2a
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-insert.adm
@@ -0,0 +1 @@
+{ "id": "001", "name": "Person Three", "hobbies": {{ "scuba", "music" }} }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
new file mode 100644
index 0000000..abd4d66
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/base/TypeComputerUtilities.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.asterix.om.typecomputer.base;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+
+public class TypeComputerUtilities {
+
+    public static void setOpenType(AbstractFunctionCallExpression expr, boolean openType) {
+        Boolean openField = true;
+        Object[] opaqueParameters = new Object[1];
+        opaqueParameters[0] = openField;
+        expr.setOpaqueParameters(opaqueParameters);
+    }
+
+    public static boolean isOpenType(AbstractFunctionCallExpression expr) {
+        boolean openType = false;
+        Object[] opaqueParameters = expr.getOpaqueParameters();
+        if (opaqueParameters != null) {
+            openType = (Boolean) opaqueParameters[0];
+        }
+        return openType;
+    }
+
+    public static boolean setRequiredType(AbstractFunctionCallExpression expr, ARecordType requiredRecordType) {
+        boolean changed = false;
+        Object opaqueParameter = expr.getOpaqueParameters();
+        if (opaqueParameter == null) {
+            opaqueParameter = requiredRecordType;
+            Object[] opaqueParameters = new Object[1];
+            opaqueParameters[0] = opaqueParameter;
+            expr.setOpaqueParameters(opaqueParameters);
+            changed = true;
+        }
+        return changed;
+    }
+
+    public static ARecordType getRequiredType(AbstractFunctionCallExpression expr) {
+        Object[] type = expr.getOpaqueParameters();
+        if (type != null) {
+            ARecordType recordType = (ARecordType) type[0];
+            return recordType;
+        } else
+            return null;
+    }
+
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
index f6eaaa4..e9f93e5 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
@@ -9,6 +9,7 @@
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.types.TypeHelper;
@@ -28,6 +29,14 @@
     public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
             IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+
+        /**
+         * if type has been top-down propagated, use the enforced type
+         */
+        ARecordType type = TypeComputerUtilities.getRequiredType(f);
+        if (type != null)
+            return type;
+
         int n = 0;
         Iterator<Mutable<ILogicalExpression>> argIter = f.getArguments().iterator();
         List<String> namesList = new ArrayList<String>();
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OrderedListConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OrderedListConstructorResultType.java
index ef569a7..cf68dda 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OrderedListConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/OrderedListConstructorResultType.java
@@ -3,6 +3,7 @@
 import java.util.ArrayList;
 
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnionType;
@@ -23,8 +24,9 @@
     public AOrderedListType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
             IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        boolean openType = TypeComputerUtilities.isOpenType(f);
         int n = f.getArguments().size();
-        if (n == 0) {
+        if (n == 0 || openType) {
             return new AOrderedListType(BuiltinType.ANY, null);
         } else {
             ArrayList<IAType> types = new ArrayList<IAType>();
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/UnorderedListConstructorResultType.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/UnorderedListConstructorResultType.java
index 9ffb77f..18c6168 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/UnorderedListConstructorResultType.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/UnorderedListConstructorResultType.java
@@ -3,6 +3,7 @@
 import java.util.ArrayList;
 
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.typecomputer.base.TypeComputerUtilities;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
@@ -23,8 +24,9 @@
     public AUnorderedListType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
             IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
         AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        boolean openType = TypeComputerUtilities.isOpenType(f);
         int n = f.getArguments().size();
-        if (n == 0) {
+        if (n == 0 || openType) {
             return new AUnorderedListType(BuiltinType.ANY, null);
         } else {
             ArrayList<IAType> types = new ArrayList<IAType>();