[ASTERIXDB-3239][API] Add profile summary to plan

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:

If profiling is enabled, annotate the optimized plan
with the min & max time for each operator and connector
across partitions.
This can give a good idea of where the most time was
spent in a query generally.

Change-Id: I08b509fca0eb3ab9aad82e816dfe4849adf125ec
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17709
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Contrib: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 41be44b..5cf5fa8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -114,6 +114,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Provides helper methods for compilation of a query into a JobSpec and submission
@@ -131,6 +132,7 @@
     private final IRuleSetFactory ruleSetFactory;
     private final Set<String> configurableParameterNames;
     private final ExecutionPlans executionPlans;
+    private PlanInfo lastPlan;
 
     public APIFramework(ILangCompilationProvider compilationProvider) {
         this.rewriterFactory = compilationProvider.getRewriterFactory();
@@ -139,6 +141,22 @@
         this.ruleSetFactory = compilationProvider.getRuleSetFactory();
         this.configurableParameterNames = compilationProvider.getCompilerOptions();
         executionPlans = new ExecutionPlans();
+        lastPlan = null;
+    }
+
+    private class PlanInfo {
+        ILogicalPlan plan;
+        Map<Object, String> log2Phys;
+        boolean printOptimizerEstimates;
+        SessionConfig.PlanFormat format;
+
+        public PlanInfo(ILogicalPlan plan, Map<Object, String> log2Phys, boolean printOptimizerEstimates,
+                SessionConfig.PlanFormat format) {
+            this.plan = plan;
+            this.log2Phys = log2Phys;
+            this.printOptimizerEstimates = printOptimizerEstimates;
+            this.format = format;
+        }
     }
 
     private static class OptimizationContextFactory implements IOptimizationContextFactory {
@@ -327,6 +345,7 @@
             if (isQuery || isLoad) {
                 generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                         cboMode);
+                lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
             }
         }
 
@@ -535,6 +554,20 @@
                 getPrettyPrintVisitor(format).printPlan(plan, log2phys, printOptimizerEstimates).toString());
     }
 
+    public void generateOptimizedLogicalPlanWithProfile(ObjectNode profile) throws HyracksDataException {
+        /*TODO(ian): we call this and overwrite the non-annotated plan, but there should be some way to skip initial
+                     plan printing if both profiling and plan printing are requested. */
+        try {
+            if (lastPlan != null) {
+                executionPlans.setOptimizedLogicalPlan(getPrettyPrintVisitor(lastPlan.format)
+                        .printPlan(lastPlan.plan, lastPlan.log2Phys, lastPlan.printOptimizerEstimates, profile)
+                        .toString());
+            }
+        } catch (AlgebricksException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
     private void generateOptimizedLogicalPlan(ILogicalPlan plan, SessionConfig.PlanFormat format,
             boolean printOptimizerEstimates) throws AlgebricksException {
         executionPlans.setOptimizedLogicalPlan(
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 9f6a5cb..cb64c01 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4780,6 +4780,7 @@
         stats.setProcessedObjects(resultMetadata.getProcessedObjects());
         if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
             stats.setJobProfile(resultMetadata.getJobProfile());
+            apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
         }
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index d1294e2..a67dd71 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -111,5 +111,9 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
index 2b53de4..9f138b5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
@@ -25,6 +25,8 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * Note: Some implementations may be stateful and not thread-safe.
  */
@@ -49,6 +51,10 @@
     IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates)
             throws AlgebricksException;
 
+    /** Prints the logical plan, annotated with physical operator and connector ids, and profiling info*/
+    IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys, boolean printOptimizerEstimates,
+            ObjectNode profile) throws AlgebricksException;
+
     /** Resets the state of the pretty printer. */
     IPlanPrettyPrinter reset() throws AlgebricksException;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7aabbef..91217f1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -76,6 +76,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPrettyPrintVisitor<Integer>
         implements IPlanPrettyPrinter {
 
@@ -108,6 +110,14 @@
     }
 
     @Override
+    public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+            boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+        //TODO(ian): add times
+        printPlanImpl(plan, 0, printOptimizerEstimates);
+        return this;
+    }
+
+    @Override
     public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
             boolean printOptimizerEstimates) throws AlgebricksException {
         printOperatorImpl(op, 0, printInputs, printOptimizerEstimates);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index e54ef02..32c1464 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -82,12 +82,15 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.util.DefaultIndenter;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperatorPrettyPrintVisitor<Void>
         implements IPlanPrettyPrinter {
@@ -104,6 +107,7 @@
     private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
     private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
     private Map<Object, String> log2odid = Collections.emptyMap();
+    private Map<String, ProfileInfo> profile = Collections.emptyMap();
     private final IdCounter idCounter = new IdCounter();
     private final JsonGenerator jsonGenerator;
 
@@ -151,6 +155,50 @@
         }
     }
 
+    private class ProfileInfo {
+        Map<Integer, Pair<Double, Double>> activities;
+
+        ProfileInfo() {
+            activities = new HashMap<>();
+        }
+
+        void visit(int id, double time) {
+            Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+            if (times.getFirst() > time) {
+                times.setFirst(time);
+            }
+            if (times.getSecond() < time) {
+                times.setSecond(time);
+            }
+        }
+    }
+
+    private static ActivityId acIdFromName(String name) {
+        String[] parts = name.split(" - ");
+        return ActivityId.parse(parts[0]);
+    }
+
+    Map<String, ProfileInfo> processProfile(ObjectNode profile) {
+        Map<String, ProfileInfo> profiledOps = new HashMap<>();
+        for (JsonNode joblet : profile.get("joblets")) {
+            for (JsonNode task : joblet.get("tasks")) {
+                for (JsonNode counters : task.get("counters")) {
+                    ProfileInfo info =
+                            profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
+                    info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+                            counters.get("run-time").asDouble());
+                }
+                for (JsonNode partition : task.get("partition-send-profile")) {
+                    String id = partition.get("partition-id").get("connector-id").asText();
+                    ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+                    //CDIDs are unique
+                    info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+                }
+            }
+        }
+        return profiledOps;
+    }
+
     @Override
     public final IPlanPrettyPrinter reset() throws AlgebricksException {
         flushContentToWriter();
@@ -177,6 +225,16 @@
     }
 
     @Override
+    public IPlanPrettyPrinter printPlan(ILogicalPlan plan, Map<Object, String> log2phys,
+            boolean printOptimizerEstimates, ObjectNode profile) throws AlgebricksException {
+        this.log2odid = log2phys;
+        this.profile = processProfile(profile);
+        printPlanImpl(plan, printOptimizerEstimates);
+        flushContentToWriter();
+        return this;
+    }
+
+    @Override
     public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs,
             boolean printOptimizerEstimates) throws AlgebricksException {
         printOperatorImpl(op, printInputs, printOptimizerEstimates);
@@ -210,6 +268,23 @@
             String od = log2odid.get(op);
             if (od != null) {
                 jsonGenerator.writeStringField("runtime-id", od);
+                ProfileInfo info = profile.get(od);
+                if (info != null) {
+                    if (info.activities.size() == 1) {
+                        jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
+                        jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+                    } else {
+                        jsonGenerator.writeObjectFieldStart("times");
+                        for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
+                            jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+                            jsonGenerator.writeNumberField("min-time", ac.getValue().first);
+                            jsonGenerator.writeNumberField("max-time", ac.getValue().second);
+                            jsonGenerator.writeEndObject();
+                        }
+                        jsonGenerator.writeEndObject();
+                    }
+
+                }
             }
             IPhysicalOperator pOp = op.getPhysicalOperator();
             if (pOp != null) {