[ASTERIXDB-3313][COMP] Defuse profile when not profiling

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
The logical2physical map is time consuming to produce, so it should
not be computed unless it's needed for a profile. Likewise keeping
the last plan in memory is wasteful and useless unless a profile is
needed. Lastly in the pipeline assembler, look at the profile flag
first instead of the instanceof check to save time.

Change-Id: Ia9a84eedbcb33c29f03155a8605bb82af372f1f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17962
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 3c0b01d..0cc2790 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -107,6 +108,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -204,7 +206,7 @@
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, int varCounter, String outputDatasetName, SessionOutput output,
             ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer,
-            IWarningCollector warningCollector, IRequestParameters requestParameters)
+            IWarningCollector warningCollector, IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
             throws AlgebricksException, ACIDException {
 
         // establish facts
@@ -325,7 +327,7 @@
 
         JobEventListenerFactory jobEventListenerFactory =
                 new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
-        JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory);
+        JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory, runtimeFlags);
 
         if (isQuery) {
             if (!compiler.skipJobCapacityAssignment()) {
@@ -347,7 +349,10 @@
             if (isQuery || isLoad) {
                 generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                         cboMode);
-                lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+                    lastPlan =
+                            new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                }
             }
         }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8b635a2..e602b76 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3663,7 +3663,7 @@
                     loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
-                    null, responsePrinter, warningCollector, null);
+                    null, responsePrinter, warningCollector, null, jobFlags);
             afterCompile();
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -3794,7 +3794,7 @@
         // Query Compilation (happens under the same ongoing metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
                 rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars,
-                responsePrinter, warningCollector, requestParameters);
+                responsePrinter, warningCollector, requestParameters, jobFlags);
     }
 
     protected JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -3835,7 +3835,7 @@
         // transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
                 rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter,
-                warningCollector, null);
+                warningCollector, null, jobFlags);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 85910aa..a44e1be 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -47,6 +48,7 @@
 import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder {
@@ -172,6 +174,17 @@
         }
 
         @Override
+        public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory,
+                EnumSet<JobFlag> runtimeFlags) throws AlgebricksException {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+            PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+            if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+                pc.enableLog2PhysMapping();
+            }
+            return pc.compilePlan(plan, jobEventListenerFactory);
+        }
+
+        @Override
         public boolean skipJobCapacityAssignment() {
             return oc.skipJobCapacityAssignment();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
index 0f5798e..7de0adb 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.EnumSet;
+
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public interface ICompiler {
@@ -28,5 +31,8 @@
     public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
             throws AlgebricksException;
 
+    JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobletEventListenerFactory,
+            EnumSet<JobFlag> runtimeFlags) throws AlgebricksException;
+
     boolean skipJobCapacityAssignment();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 99ef8d0..ca26515 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -74,6 +74,8 @@
 
     private int aodCounter = 0;
 
+    private boolean genLog2PhysMap = false;
+
     public JobBuilder(JobSpecification jobSpec, AlgebricksAbsolutePartitionConstraint clusterLocations) {
         this.jobSpec = jobSpec;
         this.clusterLocations = clusterLocations;
@@ -96,6 +98,10 @@
                 new String[] { clusterLocations.getLocations()[Math.abs(jobSpec.hashCode() % nPartitions)] });
     }
 
+    public void enableLog2PhysMapping() {
+        this.genLog2PhysMap = true;
+    }
+
     @Override
     public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) {
         contributeMicroOperator(op, runtime, recDesc, null);
@@ -214,7 +220,11 @@
             jobSpec.addRoot(opDesc);
         }
         setAllPartitionConstraints(tgtConstraints);
-        jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+        if (genLog2PhysMap) {
+            jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+        } else {
+            jobSpec.setLogical2PhysicalMap(Collections.emptyMap());
+        }
     }
 
     public List<IOperatorDescriptor> getGeneratedMetaOps() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 2ef5c6c..f8bdfa5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -39,6 +39,8 @@
     private JobGenContext context;
     private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<>();
 
+    boolean genLog2PhysMap = false;
+
     public PlanCompiler(JobGenContext context) {
         this.context = context;
     }
@@ -47,6 +49,10 @@
         return context;
     }
 
+    public void enableLog2PhysMapping() {
+        this.genLog2PhysMap = true;
+    }
+
     public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory)
             throws AlgebricksException {
         return compilePlanImpl(plan, false, null, jobEventListenerFactory);
@@ -66,6 +72,9 @@
         }
         List<ILogicalOperator> rootOps = new ArrayList<>();
         JobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
+        if (genLog2PhysMap) {
+            builder.enableLog2PhysMapping();
+        }
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             compileOpRef(opRef, spec, builder, outerPlanSchema);
             rootOps.add(opRef.getValue());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 0ce2ff5..202c087 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -77,7 +77,7 @@
             for (int j = 0; j < newRuntimes.length; j++) {
                 //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline
                 //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow
-                boolean shouldProfile = !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) && profile
+                boolean shouldProfile = profile && !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory)
                         && microOpStats.containsKey(runtimeFactory);
                 if (shouldProfile) {
                     ProfiledPushRuntime profiled;