[NO ISSUE][COMP] Support for running queries during optimization

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Provide support for running helper queries during optimization
- Introduce a new logical ruleset for running sampling queries

Change-Id: I457063fef269ae00947169d663d828488c67c2ee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16143
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17345
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
index fbe9dbc..f557d7a 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml
@@ -59,6 +59,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-runtime</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index c22d54d..3a2fe3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.algebricks.compiler.api;
 
 import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -32,7 +34,9 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
 import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
@@ -40,6 +44,7 @@
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
@@ -49,8 +54,9 @@
 
 public abstract class AbstractCompilerFactoryBuilder {
 
-    protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
-    protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+    protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites;
+    protected Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind;
+    protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites;
     protected ITypeTraitProvider typeTraitProvider;
     protected ISerializerDeserializerProvider serializerDeserializerProvider;
     protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
@@ -59,6 +65,8 @@
     protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
     protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
     protected IPrinterFactoryProvider printerProvider;
+    protected IAWriterFactory writerFactory;
+    protected IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
     protected IExpressionRuntimeProvider expressionRuntimeProvider;
     protected IExpressionTypeComputer expressionTypeComputer;
@@ -78,11 +86,18 @@
 
     public abstract ICompilerFactory create();
 
-    public void setLogicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites) {
+    public void setLogicalRewrites(
+            Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites) {
         this.logicalRewrites = logicalRewrites;
     }
 
-    public void setPhysicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites) {
+    public void setLogicalRewritesByKind(
+            Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind) {
+        this.logicalRewritesByKind = logicalRewritesByKind;
+    }
+
+    public void setPhysicalRewrites(
+            Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites) {
         this.physicalRewrites = physicalRewrites;
     }
 
@@ -158,6 +173,22 @@
         return printerProvider;
     }
 
+    public void setWriterFactory(IAWriterFactory writerFactory) {
+        this.writerFactory = writerFactory;
+    }
+
+    public IAWriterFactory getWriterFactory() {
+        return writerFactory;
+    }
+
+    public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider resultSerializerFactoryProvider) {
+        this.resultSerializerFactoryProvider = resultSerializerFactoryProvider;
+    }
+
+    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+        return resultSerializerFactoryProvider;
+    }
+
     public void setExpressionRuntimeProvider(IExpressionRuntimeProvider expressionRuntimeProvider) {
         this.expressionRuntimeProvider = expressionRuntimeProvider;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 891980f..e35a539 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.List;
+
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
@@ -33,10 +36,15 @@
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
 import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
+import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -58,14 +66,19 @@
                 IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
                 AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) {
             IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter();
-            return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
+            return new AlgebricksOptimizationContext(this, varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
                     conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter,
                     warningCollector);
         }
+
+        @Override
+        public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) {
+            return new AlgebricksOptimizationContext((AlgebricksOptimizationContext) oc);
+        }
     }
 
-    private IOptimizationContextFactory optCtxFactory;
+    private final IOptimizationContextFactory optCtxFactory;
 
     public HeuristicCompilerFactoryBuilder() {
         this.optCtxFactory = DefaultOptimizationContextFactory.INSTANCE;
@@ -77,42 +90,85 @@
 
     @Override
     public ICompilerFactory create() {
-        return new ICompilerFactory() {
-            @Override
-            public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata,
-                    int varCounter) {
-                final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
-                        expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                        missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations,
-                        warningCollector);
-                oc.setMetadataDeclarations(metadata);
-                final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
-                return new ICompiler() {
-
-                    @Override
-                    public void optimize() throws AlgebricksException {
-                        opt.optimize();
-                    }
-
-                    @Override
-                    public JobSpecification createJob(Object appContext,
-                            IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
-                        AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
-                        JobGenContext context = new JobGenContext(null, metadata, appContext,
-                                serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
-                                comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
-                                binaryIntegerInspectorFactory, printerProvider, missingWriterFactory, nullWriterFactory,
-                                unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider,
-                                expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
-                                physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector,
-                                maxWarnings, physicalOptimizationConfig);
-                        PlanCompiler pc = new PlanCompiler(context);
-                        return pc.compilePlan(plan, jobEventListenerFactory);
-                    }
-                };
-            }
-        };
+        return new CompilerFactoryImpl();
     }
 
+    private class CompilerFactoryImpl implements ICompilerFactory {
+        @Override
+        public ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter) {
+            IOptimizationContext optContext =
+                    optCtxFactory.createOptimizationContext(varCounter, expressionEvalSizeComputer,
+                            mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
+                            conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, warningCollector);
+            optContext.setMetadataDeclarations(metadata);
+            optContext.setCompilerFactory(this);
+            return new CompilerImpl(this, plan, optContext, logicalRewrites.get(), physicalRewrites.get(),
+                    writerFactory);
+        }
+
+        @Override
+        public ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext,
+                IRuleSetKind ruleSetKind) {
+            if (newOptContext.getCompilerFactory() != this) {
+                throw new IllegalStateException();
+            }
+            return new CompilerImpl(this, plan, newOptContext, logicalRewritesByKind.apply(ruleSetKind),
+                    physicalRewrites.get(), SerializedDataWriterFactory.WITHOUT_RECORD_DESCRIPTOR);
+        }
+
+        private PlanCompiler createPlanCompiler(IOptimizationContext oc, Object appContext,
+                IAWriterFactory writerFactory) {
+            JobGenContext context = new JobGenContext(null, oc.getMetadataProvider(), appContext,
+                    serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+                    comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+                    binaryIntegerInspectorFactory, printerProvider, writerFactory, resultSerializerFactoryProvider,
+                    missingWriterFactory, nullWriterFactory, unnestingPositionWriterFactory,
+                    normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, oc,
+                    expressionEvalSizeComputer, partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+                    physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, maxWarnings,
+                    physicalOptimizationConfig);
+            return new PlanCompiler(context);
+        }
+    }
+
+    private static class CompilerImpl implements ICompiler {
+
+        private final CompilerFactoryImpl factory;
+
+        private final ILogicalPlan plan;
+
+        private final IOptimizationContext oc;
+
+        private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
+
+        private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+
+        private final IAWriterFactory writerFactory;
+
+        private CompilerImpl(CompilerFactoryImpl factory, ILogicalPlan plan, IOptimizationContext oc,
+                List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
+                List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
+                IAWriterFactory writerFactory) {
+            this.factory = factory;
+            this.plan = plan;
+            this.oc = oc;
+            this.logicalRewrites = logicalRewrites;
+            this.physicalRewrites = physicalRewrites;
+            this.writerFactory = writerFactory;
+        }
+
+        @Override
+        public void optimize() throws AlgebricksException {
+            HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
+            opt.optimize();
+        }
+
+        @Override
+        public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
+                throws AlgebricksException {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+            PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+            return pc.compilePlan(plan, jobEventListenerFactory);
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
index 7c138ea..07a8034 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java
@@ -19,8 +19,12 @@
 package org.apache.hyracks.algebricks.compiler.api;
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind;
 
 public interface ICompilerFactory {
     ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter);
+
+    ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext, IRuleSetKind ruleSetKind);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
index 7f2d3c8..3982171 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java
@@ -32,10 +32,18 @@
         return counter;
     }
 
+    public int getAndInc() {
+        return counter++;
+    }
+
     public void inc() {
         ++counter;
     }
 
+    public int incAndGet() {
+        return ++counter;
+    }
+
     public void set(int newStart) {
         counter = newStart;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 166ab9a..69ec210 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
@@ -93,4 +94,10 @@
     public PlanStructureVerifier getPlanStructureVerifier();
 
     public PlanStabilityVerifier getPlanStabilityVerifier();
+
+    void setCompilerFactory(Object factory);
+
+    Object getCompilerFactory();
+
+    IOptimizationContextFactory getOptimizationContextFactory();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 77fdd74..d350789 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -31,7 +31,9 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -56,11 +58,12 @@
             IProjectionInfo<?> projectionInfo) throws AlgebricksException;
 
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException;
+            int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+            RecordDescriptor inputDesc) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc,
+            int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+            IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
             IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 138cff8..1f7c16c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -102,8 +102,9 @@
         IPrinterFactory[] pf =
                 JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns);
 
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), columns, pf, inputDesc, resultOp.getResultMetadata(), spec);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
+                mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, context.getWriterFactory(),
+                        context.getResultSerializerFactoryProvider(), inputDesc, resultOp.getResultMetadata(), spec);
         IOperatorDescriptor opDesc = runtimeAndConstraints.first;
         opDesc.setSourceLocation(resultOp.getSourceLocation());
         builder.contributeHyracksOperator(resultOp, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 3521a27..07c798f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -101,7 +101,7 @@
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
         Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints =
-                mp.getWriteFileRuntime(write.getDataSink(), columns, pf, inputDesc);
+                mp.getWriteFileRuntime(write.getDataSink(), columns, pf, context.getWriterFactory(), inputDesc);
         IPushRuntimeFactory runtime = runtimeAndConstraints.first;
         runtime.setSourceLocation(write.getSourceLocation());
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 7c7d5a8..471380c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
@@ -42,6 +43,7 @@
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory;
@@ -57,6 +59,8 @@
     private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
     private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
     private final IPrinterFactoryProvider printerFactoryProvider;
+    private final IAWriterFactory writerFactory;
+    private final IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private final ITypeTraitProvider typeTraitProvider;
     private final IMetadataProvider<?, ?> metadataProvider;
     private final IMissingWriterFactory missingWriterFactory;
@@ -86,6 +90,7 @@
             IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
             IBinaryBooleanInspectorFactory booleanInspectorFactory,
             IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+            IAWriterFactory writerFactory, IResultSerializerFactoryProvider resultSerializerFactoryProvider,
             IMissingWriterFactory missingWriterFactory, IMissingWriterFactory nullWriterFactory,
             IUnnestingPositionWriterFactory unnestingPositionWriterFactory,
             INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
@@ -106,6 +111,8 @@
         this.booleanInspectorFactory = booleanInspectorFactory;
         this.integerInspectorFactory = integerInspectorFactory;
         this.printerFactoryProvider = printerFactoryProvider;
+        this.writerFactory = writerFactory;
+        this.resultSerializerFactoryProvider = resultSerializerFactoryProvider;
         this.clusterLocations = clusterLocations;
         this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
         this.missingWriterFactory = missingWriterFactory;
@@ -172,6 +179,14 @@
         return printerFactoryProvider;
     }
 
+    public IAWriterFactory getWriterFactory() {
+        return writerFactory;
+    }
+
+    public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() {
+        return resultSerializerFactoryProvider;
+    }
+
     public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
         return predEvaluatorFactoryProvider;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index d1fd247..bee842c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -46,45 +46,29 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
 
 /**
  * The Algebricks default implementation for IOptimizationContext.
  */
-@SuppressWarnings({ "unchecked", "rawtypes" })
+@SuppressWarnings({ "rawtypes" })
 public class AlgebricksOptimizationContext implements IOptimizationContext {
 
-    private int varCounter;
+    private final IOptimizationContextFactory optContextFactory;
     private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
     private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
     private final PhysicalOptimizationConfig physicalOptimizationConfig;
-    private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
 
-        Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
-
-        @Override
-        public void setVariableEvalSize(LogicalVariable var, int size) {
-            varSizeMap.put(var, size);
-        }
-
-        @Override
-        public int getVariableEvalSize(LogicalVariable var) {
-            return varSizeMap.get(var);
-        }
-    };
-
-    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
-
-    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
-    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
-    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
-
-    private IMetadataProvider metadataProvider;
-    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
-
+    private final VariableEvalSizeEnvironmentImpl varEvalSizeEnv = new VariableEvalSizeEnvironmentImpl();
+    private final Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
+    private final Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
+    private final Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
+    private final Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
+    private final HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
     protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>();
     protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>();
-
     protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>();
+
     private final IExpressionTypeComputer expressionTypeComputer;
     private final IMissableTypeComputer nullableTypeComputer;
     private final INodeDomain defaultNodeDomain;
@@ -94,12 +78,18 @@
     private final PlanStructureVerifier planStructureVerifier;
     private final PlanStabilityVerifier planStabilityVerifier;
 
-    public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+    private int varCounter;
+    private IMetadataProvider metadataProvider;
+    private Object compilerFactory;
+
+    public AlgebricksOptimizationContext(IOptimizationContextFactory optContextFactory, int varCounter,
+            IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
             IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
             AlgebricksPartitionConstraint clusterLocations, IPlanPrettyPrinter prettyPrinter,
             IWarningCollector warningCollector) {
+        this.optContextFactory = optContextFactory;
         this.varCounter = varCounter;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -115,6 +105,35 @@
         this.planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(prettyPrinter) : null;
     }
 
+    public AlgebricksOptimizationContext(AlgebricksOptimizationContext from) {
+        optContextFactory = from.optContextFactory;
+        varCounter = from.varCounter;
+        expressionEvalSizeComputer = from.expressionEvalSizeComputer;
+        mergeAggregationExpressionFactory = from.mergeAggregationExpressionFactory;
+        expressionTypeComputer = from.expressionTypeComputer;
+        nullableTypeComputer = from.nullableTypeComputer;
+        physicalOptimizationConfig = from.physicalOptimizationConfig;
+        defaultNodeDomain = from.defaultNodeDomain;
+        prettyPrinter = from.prettyPrinter;
+        conflictingTypeResovler = from.conflictingTypeResovler;
+        warningCollector = NoOpWarningCollector.INSTANCE;
+        boolean isSanityCheckEnabled = physicalOptimizationConfig.isSanityCheckEnabled();
+        planStructureVerifier = isSanityCheckEnabled ? new PlanStructureVerifier(from.prettyPrinter, this) : null;
+        planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(from.prettyPrinter) : null;
+        metadataProvider = from.metadataProvider;
+        compilerFactory = from.compilerFactory;
+
+        varEvalSizeEnv.varSizeMap.putAll(from.varEvalSizeEnv.varSizeMap);
+        typeEnvMap.putAll(from.typeEnvMap);
+        alreadyCompared.putAll(from.alreadyCompared);
+        dontApply.putAll(from.dontApply);
+        varToPrimaryKey.putAll(from.varToPrimaryKey);
+        notToBeInlinedVars.addAll(from.notToBeInlinedVars);
+        fdGlobalMap.putAll(from.fdGlobalMap);
+        eqClassGlobalMap.putAll(from.eqClassGlobalMap);
+        logicalProps.putAll(from.logicalProps);
+    }
+
     @Override
     public int getVarCounter() {
         return varCounter;
@@ -354,4 +373,34 @@
     public PlanStabilityVerifier getPlanStabilityVerifier() {
         return planStabilityVerifier;
     }
+
+    @Override
+    public IOptimizationContextFactory getOptimizationContextFactory() {
+        return optContextFactory;
+    }
+
+    @Override
+    public void setCompilerFactory(Object compilerFactory) {
+        this.compilerFactory = compilerFactory;
+    }
+
+    @Override
+    public Object getCompilerFactory() {
+        return compilerFactory;
+    }
+
+    protected static class VariableEvalSizeEnvironmentImpl implements IVariableEvalSizeEnvironment {
+
+        protected final Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
+
+        @Override
+        public void setVariableEvalSize(LogicalVariable var, int size) {
+            varSizeMap.put(var, size);
+        }
+
+        @Override
+        public int getVariableEvalSize(LogicalVariable var) {
+            return varSizeMap.get(var);
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 1c41e9a..c81025d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -28,10 +28,12 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
 public interface IOptimizationContextFactory {
-    public IOptimizationContext createOptimizationContext(int varCounter,
+    IOptimizationContext createOptimizationContext(int varCounter,
             IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
             IConflictingTypeResolver conflictintTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
             AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector);
+
+    IOptimizationContext cloneOptimizationContext(IOptimizationContext oc);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java
new file mode 100644
index 0000000..5ce61ec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.rewriter.base;
+
+public interface IRuleSetKind {
+    String name();
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
index 7525180..75fd791 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -32,10 +32,8 @@
      *            - A printer factory array to print the tuple containing different fields.
      * @param writerFactory
      *            - A writer factory to write the serialized data to the print stream.
-     * @param inputRecordDesc
-     *            - The record descriptor describing the input frame to be serialized.
      * @return A new instance of result serialized appender.
      */
-    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
-            IPrinterFactory[] printerFactories, IAWriterFactory writerFactory);
+    public IResultSerializerFactory getResultSerializerFactoryProvider(int[] fields, IPrinterFactory[] printerFactories,
+            IAWriterFactory writerFactory);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
index e1b2e5d..1eda133 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java
@@ -47,7 +47,7 @@
         }
         DistributeResultOperator dop = (DistributeResultOperator) op;
         IResultMetadata resultMetadata = dop.getResultMetadata();
-        if (resultMetadata.getOutputTypes() != null) {
+        if (resultMetadata == null || resultMetadata.getOutputTypes() != null) {
             return false;
         }
         List<Mutable<ILogicalExpression>> exprList = dop.getExpressions();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
index 763e6ff..90fa824 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
@@ -40,7 +40,7 @@
     }
 
     @Override
-    public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+    public IResultSerializerFactory getResultSerializerFactoryProvider(final int[] fields,
             final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory) {
         return new IResultSerializerFactory() {
             private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
index bc7634d..8ce4f2b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java
@@ -32,7 +32,15 @@
 
 public class SerializedDataWriterFactory implements IAWriterFactory {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
+
+    public static final SerializedDataWriterFactory WITHOUT_RECORD_DESCRIPTOR = new SerializedDataWriterFactory(false);
+
+    private final boolean writeRecordDescriptor;
+
+    public SerializedDataWriterFactory(boolean writeRecordDescriptor) {
+        this.writeRecordDescriptor = writeRecordDescriptor;
+    }
 
     @Override
     public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories,
@@ -41,15 +49,17 @@
 
             @Override
             public void init() throws HyracksDataException {
-                // dump the SerializerDeserializers to disk
-                try {
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    ObjectOutputStream oos = new ObjectOutputStream(baos);
-                    oos.writeObject(inputRecordDescriptor);
-                    baos.writeTo(ps);
-                    oos.close();
-                } catch (IOException e) {
-                    throw HyracksDataException.create(e);
+                if (writeRecordDescriptor) {
+                    // dump the SerializerDeserializers to disk
+                    try {
+                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        ObjectOutputStream oos = new ObjectOutputStream(baos);
+                        oos.writeObject(inputRecordDescriptor);
+                        baos.writeTo(ps);
+                        oos.close();
+                    } catch (IOException e) {
+                        throw HyracksDataException.create(e);
+                    }
                 }
             }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java
new file mode 100644
index 0000000..caf2464
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.exceptions;
+
+public final class NoOpWarningCollector implements IWarningCollector {
+
+    public static final IWarningCollector INSTANCE = new NoOpWarningCollector();
+
+    private NoOpWarningCollector() {
+    }
+
+    @Override
+    public void warn(Warning warning) {
+        // no-op
+    }
+
+    @Override
+    public boolean shouldWarn() {
+        return false;
+    }
+
+    @Override
+    public long getTotalWarningsCount() {
+        return 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 3f78234..71c81a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.exceptions.NoOpWarningCollector;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -52,22 +52,7 @@
 public class TestUtils {
 
     private static final int DEFAULT_FRAME_SIZE = 32768;
-    public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() {
-        @Override
-        public void warn(Warning warning) {
-            // no-op
-        }
-
-        @Override
-        public boolean shouldWarn() {
-            return false;
-        }
-
-        @Override
-        public long getTotalWarningsCount() {
-            return 0;
-        }
-    };
+    public static final IWarningCollector NOOP_WARNING_COLLECTOR = NoOpWarningCollector.INSTANCE;
 
     public static IHyracksTaskContext createHyracksTask() {
         return create(DEFAULT_FRAME_SIZE);