[NO ISSUE][COMP] Support for running queries during optimization

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Provide support for running helper queries during optimization
- Introduce a new logical ruleset for running sampling queries

Change-Id: I457063fef269ae00947169d663d828488c67c2ee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16143
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17345
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
index fbe9dbc..f557d7a 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
@@ -59,6 +59,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-runtime</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index c22d54d..3a2fe3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.algebricks.compiler.api;
 
 import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -32,7 +34,9 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
 import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
@@ -40,6 +44,7 @@
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
@@ -49,8 +54,9 @@
 
 public abstract class AbstractCompilerFactoryBuilder {
 
-    protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
-    protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+    protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites;
+    protected Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind;
+    protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites;
     protected ITypeTraitProvider typeTraitProvider;
     protected ISerializerDeserializerProvider serializerDeserializerProvider;
     protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
@@ -59,6 +65,8 @@
     protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
     protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
     protected IPrinterFactoryProvider printerProvider;
+    protected IAWriterFactory writerFactory;
+    protected IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
     protected IExpressionRuntimeProvider expressionRuntimeProvider;
     protected IExpressionTypeComputer expressionTypeComputer;
@@ -78,11 +86,18 @@
 
     public abstract ICompilerFactory create();
 
-    public void setLogicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites) {
+    public void setLogicalRewrites(
+            Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites) {
         this.logicalRewrites = logicalRewrites;
     }
 
-    public void setPhysicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites) {
+    public void setLogicalRewritesByKind(
+            Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind) {
+        this.logicalRewritesByKind = logicalRewritesByKind;
+    }
+
+    public void setPhysicalRewrites(
+            Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites) {
         this.physicalRewrites = physicalRewrites;
     }
 
@@ -158,6 +173,22 @@
         return printerProvider;
     }
 
+    public void setWriterFactory(IAWriterFactory writerFactory) {
+        this.writerFactory = writerFactory;
+    }
+
+    public IAWriterFactory getWriterFactory() {
+        return writerFactory;
+    }
+
+    public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider resultSerializerFactoryProvider) {
+        this.resultSerializerFactoryProvider = resultSerializerFactoryProvider;
+    }
+
+    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+        return resultSerializerFactoryProvider;
+    }
+
     public void setExpressionRuntimeProvider(IExpressionRuntimeProvider expressionRuntimeProvider) {
         this.expressionRuntimeProvider = expressionRuntimeProvider;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 891980f..e35a539 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.List;
+
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
@@ -33,10 +36,15 @@
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -58,14 +66,19 @@
                 IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
                 AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) {
             IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter();
-            return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
+            return new AlgebricksOptimizationContext(this, varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
                     conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter,
                     warningCollector);
         }
+
+        @Override
+        public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) {
+            return new AlgebricksOptimizationContext((AlgebricksOptimizationContext) oc);
+        }
     }
 
-    private IOptimizationContextFactory optCtxFactory;
+    private final IOptimizationContextFactory optCtxFactory;
 
     public HeuristicCompilerFactoryBuilder() {
         this.optCtxFactory = DefaultOptimizationContextFactory.INSTANCE;
@@ -77,42 +90,85 @@
 
     @Override
     public ICompilerFactory create() {
-        return new ICompilerFactory() {
-            @Override
-            public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata,
-                    int varCounter) {
-                final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
-                        expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                        missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations,
-                        warningCollector);
-                oc.setMetadataDeclarations(metadata);
-                final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
-                return new ICompiler() {
-
-                    @Override
-                    public void optimize() throws AlgebricksException {
-                        opt.optimize();
-                    }
-
-                    @Override
-                    public JobSpecification createJob(Object appContext,
-                            IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
-                        AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
-                        JobGenContext context = new JobGenContext(null, metadata, appContext,
-                                serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
-                                comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
-                                binaryIntegerInspectorFactory, printerProvider, missingWriterFactory, nullWriterFactory,
-                                unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider,
-                                expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
-                                physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector,
-                                maxWarnings, physicalOptimizationConfig);
-                        PlanCompiler pc = new PlanCompiler(context);
-                        return pc.compilePlan(plan, jobEventListenerFactory);
-                    }
-                };
-            }
-        };
+        return new CompilerFactoryImpl();
     }
 
+    private class CompilerFactoryImpl implements ICompilerFactory {
+        @Override
+        public ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter) {
+            IOptimizationContext optContext =
+                    optCtxFactory.createOptimizationContext(varCounter, expressionEvalSizeComputer,
+                            mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
+                            conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, warningCollector);
+            optContext.setMetadataDeclarations(metadata);
+            optContext.setCompilerFactory(this);
+            return new CompilerImpl(this, plan, optContext, logicalRewrites.get(), physicalRewrites.get(),
+                    writerFactory);
+        }
+
+        @Override
+        public ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext,
+                IRuleSetKind ruleSetKind) {
+            if (newOptContext.getCompilerFactory() != this) {
+                throw new IllegalStateException();
+            }
+            return new CompilerImpl(this, plan, newOptContext, logicalRewritesByKind.apply(ruleSetKind),
+                    physicalRewrites.get(), SerializedDataWriterFactory.WITHOUT_RECORD_DESCRIPTOR);
+        }
+
+        private PlanCompiler createPlanCompiler(IOptimizationContext oc, Object appContext,
+                IAWriterFactory writerFactory) {
+            JobGenContext context = new JobGenContext(null, oc.getMetadataProvider(), appContext,
+                    serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+                    comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+                    binaryIntegerInspectorFactory, printerProvider, writerFactory, resultSerializerFactoryProvider,
+                    missingWriterFactory, nullWriterFactory, unnestingPositionWriterFactory,
+                    normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, oc,
+                    expressionEvalSizeComputer, partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+                    physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, maxWarnings,
+                    physicalOptimizationConfig);
+            return new PlanCompiler(context);
+        }
+    }
+
+    private static class CompilerImpl implements ICompiler {
+
+        private final CompilerFactoryImpl factory;
+
+        private final ILogicalPlan plan;
+
+        private final IOptimizationContext oc;
+
+        private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
+
+        private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+
+        private final IAWriterFactory writerFactory;
+
+        private CompilerImpl(CompilerFactoryImpl factory, ILogicalPlan plan, IOptimizationContext oc,
+                List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
+                List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
+                IAWriterFactory writerFactory) {
+            this.factory = factory;
+            this.plan = plan;
+            this.oc = oc;
+            this.logicalRewrites = logicalRewrites;
+            this.physicalRewrites = physicalRewrites;
+            this.writerFactory = writerFactory;
+        }
+
+        @Override
+        public void optimize() throws AlgebricksException {
+            HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
+            opt.optimize();
+        }
+
+        @Override
+        public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
+                throws AlgebricksException {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+            PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+            return pc.compilePlan(plan, jobEventListenerFactory);
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
index 7c138ea..07a8034 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
@@ -19,8 +19,12 @@
 package org.apache.hyracks.algebricks.compiler.api;
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 
 public interface ICompilerFactory {
     ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter);
+
+    ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext, IRuleSetKind ruleSetKind);
 }