[ASTERIXDB-3249][COMP] Transform SOME AND EVERY IN query to SOME IN query for selectivity estimation

Change-Id: Iee8144ffa3f749d1c96bbb63e56cf970561f19d6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17736
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/array/AbstractOperatorFromSubplanRewrite.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/array/AbstractOperatorFromSubplanRewrite.java
index 1aa2bee..1611bda 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/array/AbstractOperatorFromSubplanRewrite.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/array/AbstractOperatorFromSubplanRewrite.java
@@ -310,7 +310,7 @@
         return combinedCondition.cloneExpression();
     }
 
-    private SelectOperator getSelectFromPlan(AggregateOperator subplanRoot) {
+    public static SelectOperator getSelectFromPlan(AggregateOperator subplanRoot) {
         ILogicalExpression aggregateCondition = null;
         boolean isNonEmptyStream = false;
         for (Mutable<ILogicalExpression> expression : subplanRoot.getExpressions()) {
@@ -345,7 +345,8 @@
         if (isNonEmptyStream && aggregateCondition != null) {
             SelectOperator selectFromAgg = new SelectOperator(new MutableObject<>(aggregateCondition));
             selectFromAgg.getInputs().addAll(subplanRoot.getInputs());
-            selectFromAgg.setSourceLocation(sourceLocation);
+            selectFromAgg.setSourceLocation(subplanRoot.getSourceLocation());
+
             return selectFromAgg;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index 13082f2..d09783d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.om.functions.BuiltinFunctionInfo;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.optimizer.base.AnalysisUtil;
+import org.apache.asterix.optimizer.rules.am.array.AbstractOperatorFromSubplanRewrite;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -268,15 +269,31 @@
         return sel;
     }
 
-    private int countOps(ILogicalOperator op, LogicalOperatorTag tag) {
-        int count = 0;
+    private List<ILogicalOperator> countOps(ILogicalOperator op, LogicalOperatorTag tag) {
+        List<ILogicalOperator> ops = new ArrayList<>();
+
         while (op != null && op.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
             if (op.getOperatorTag().equals(tag)) {
-                count++;
+                ops.add(op);
             }
             op = op.getInputs().get(0).getValue();
         }
-        return count;
+        return ops;
+    }
+
+    private AggregateOperator findAggOp(ILogicalOperator op, ILogicalExpression exp) throws AlgebricksException {
+        /*private final */ ContainsExpressionVisitor visitor = new ContainsExpressionVisitor();
+        SubplanOperator subOp;
+        while (op != null && op.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+            if (op.getOperatorTag().equals(LogicalOperatorTag.SUBPLAN)) {
+                subOp = (SubplanOperator) op;
+                ILogicalOperator nextOp = subOp.getNestedPlans().get(0).getRoots().get(0).getValue();
+                if (nextOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE)
+                    return (AggregateOperator) nextOp;
+            }
+            op = op.getInputs().get(0).getValue();
+        }
+        return null;
     }
 
     private SubplanOperator findSubplanWithExpr(ILogicalOperator op, ILogicalExpression exp)
@@ -417,8 +434,8 @@
                 (DataSourceScanOperator) OperatorManipulationUtil.bottomUpCopyOperators(scanOp);
         deepCopyofScan.setDataSource(sampledatasource);
 
-        int numSubplans = countOps(selOp, LogicalOperatorTag.SUBPLAN);
-
+        List<ILogicalOperator> subPlans = countOps(selOp, LogicalOperatorTag.SUBPLAN);
+        int numSubplans = subPlans.size();
         List<List<IAObject>> result;
 
         // insert this in place of the scandatasourceOp.
@@ -429,11 +446,30 @@
             result = runSamplingQuery(optCtx, selOp);
             selOp.getCondition().setValue(saveExprs);
         } else {
-            int numSelects = countOps(selOp, LogicalOperatorTag.SELECT);
+            List<ILogicalOperator> selOps = countOps(selOp, LogicalOperatorTag.SELECT);
+            int numSelects = selOps.size();
             int nonSubplanSelects = numSelects - numSubplans;
 
             if (numSubplans == 1 && nonSubplanSelects == 0) {
-                result = runSamplingQuery(optCtx, selOp); // no need to switch anything
+                AggregateOperator aggOp = findAggOp(selOp, exp);
+                if (aggOp.getExpressions().size() > 1) {
+                    // ANY and EVERY IN query; for selectivity purposes, we need to transform this into a ANY IN query
+                    SelectOperator newSelOp = (SelectOperator) OperatorManipulationUtil.bottomUpCopyOperators(selOp);
+                    aggOp = findAggOp(newSelOp, exp);
+                    ILogicalOperator input = aggOp.getInputs().get(0).getValue();
+                    SelectOperator condition = (SelectOperator) OperatorManipulationUtil
+                            .bottomUpCopyOperators(AbstractOperatorFromSubplanRewrite.getSelectFromPlan(aggOp));
+                    //push this condition below aggOp.
+                    aggOp.getInputs().get(0).setValue(condition);
+                    condition.getInputs().get(0).setValue(input);
+                    ILogicalExpression newExp2 = newSelOp.getCondition().getValue();
+                    if (newExp2.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                        AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) newExp2;
+                        afce.getArguments().get(1).setValue(ConstantExpression.TRUE);
+                    }
+                    result = runSamplingQuery(optCtx, newSelOp); // no need to switch anything
+                } else
+                    result = runSamplingQuery(optCtx, selOp);
             } else { // the painful part; have to find where exp that is passed in is coming from. >= 1 and >= 1 case
                 // Assumption is that there is exaclty one select condition above each subplan.
                 // This was ensured before this routine is called
@@ -484,6 +520,9 @@
         return sel;
     }
 
+    private void transformtoAnyInPlan(SelectOperator newSelOp) {
+    }
+
     protected List<List<IAObject>> runSamplingQuery(IOptimizationContext ctx, ILogicalOperator logOp)
             throws AlgebricksException {
         LOGGER.info("***running sample query***");