Merge "Merge branch 'gerrit/trinity' into 'master'"
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9517cf0..9bd65a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -108,6 +109,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -205,7 +207,8 @@
     public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
             Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledStatement statement,
             Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, IWarningCollector warningCollector,
-            IRequestParameters requestParameters) throws AlgebricksException, ACIDException {
+            IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
+            throws AlgebricksException, ACIDException {
 
         // establish facts
         final boolean isQuery = query != null;
@@ -326,7 +329,7 @@
 
         JobEventListenerFactory jobEventListenerFactory =
                 new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
-        JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory);
+        JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory, runtimeFlags);
 
         if (isQuery) {
             if (!compiler.skipJobCapacityAssignment()) {
@@ -348,7 +351,10 @@
             if (isQuery || isLoad || isCopy) {
                 generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                         cboMode);
-                lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+                    lastPlan =
+                            new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+                }
             }
         }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index d1a1008..7ac1431 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -100,7 +100,7 @@
             metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
         }
         metadata.setProcessedObjects(processedObjects);
-        metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / pagesRead : Double.NaN);
+        metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
         metadata.setWarnings(AggregateWarnings);
         metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
         if (run != null && run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d52429f..a99fc22 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3870,7 +3870,7 @@
                     loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
-                    null, responsePrinter, warningCollector, null);
+                    null, responsePrinter, warningCollector, null, jobFlags);
             afterCompile();
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -3948,7 +3948,7 @@
                     copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
             cls.setSourceLocation(stmt.getSourceLocation());
             JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
-                    null, responsePrinter, warningCollector, null);
+                    null, responsePrinter, warningCollector, null, jobFlags);
             afterCompile();
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -4041,7 +4041,7 @@
                 // Query Compilation (happens under the same ongoing metadata transaction)
                 final JobSpecification jobSpec = apiFramework.compileQuery(hcc, metadataProvider, copyTo.getQuery(),
                         rewrittenResult.second, null, sessionOutput, compiledCopyToStatement, externalVars,
-                        responsePrinter, warningCollector, requestParameters);
+                        responsePrinter, warningCollector, requestParameters, jobFlags);
                 // update stats with count of compile-time warnings. needs to be adapted for multi-statement.
                 stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
                 afterCompile();
@@ -4238,7 +4238,7 @@
         // Query Compilation (happens under the same ongoing metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
                 rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars,
-                responsePrinter, warningCollector, requestParameters);
+                responsePrinter, warningCollector, requestParameters, jobFlags);
     }
 
     protected JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -4281,7 +4281,7 @@
         // transaction)
         return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
                 rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter,
-                warningCollector, null);
+                warningCollector, null, jobFlags);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
index 7c945fc..413f002 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
@@ -30,9 +30,6 @@
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-/**
- * Runs the cluster state runtime tests with the storage parallelism.
- */
 @RunWith(Parameterized.class)
 public class ProfiledExecutionTest {
     protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
index a404639..d3664d4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -44,5 +44,11 @@
         <output-dir compare="Text">sleep</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="profile">
+      <compilation-unit name="non-unary-subplan">
+        <parameter name="profile" value="timings" type="string"/>
+        <output-dir compare="Text">non-unary-subplan</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
new file mode 100644
index 0000000..4fa986c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+select value
+array_sum((
+   select value a.f
+   from g as p
+   union all
+   select value a.f
+   from g as w
+))
+from TData as a
+group by a.x as x group as g
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
index 82d030e..6c35376 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
@@ -24,4 +24,4 @@
 FROM Customers c
 WHERE c.age <65
 GROUP BY c.address.city
-ORDER BY sleep(city,1667);
\ No newline at end of file
+ORDER BY sleep(city,1700);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 88107b0..03900f3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -77,6 +77,24 @@
                     {
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}"
                     }
                   ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
new file mode 100644
index 0000000..e59f095
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
@@ -0,0 +1,241 @@
+{
+    "job-id": "R{[A-Z0-9.:]+}",
+    "create-time": "R{[0-9.]+}",
+    "start-time": "R{[0-9.]+}",
+    "queued-time": "R{.+}",
+    "end-time": "R{[0-9.]+}",
+    "counters": [],
+    "joblets": [
+        {
+            "node-id": "R{.+}",
+            "counters": [],
+            "tasks": [
+                {
+                    "activity-id": "R{[A-Z0-9.:]+}",
+                    "partition": "R{[0-9]+}",
+                    "attempt": "R{[0-9]+}",
+                    "partition-send-profile": [],
+                    "counters": [
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}"
+                        }
+                    ]
+                },
+                {
+                    "activity-id": "R{[A-Z0-9.:]+}",
+                    "partition": "R{[0-9]+}",
+                    "attempt": "R{[0-9]+}",
+                    "partition-send-profile": [],
+                    "counters": [
+                        {
+                            "name": "R{ANID:ODID:[0-9]:0\\.1 - MicroOp Subplan(?:.|\n)+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}"
+                        }
+                    ]
+                },
+                {
+                    "activity-id": "R{[A-Z0-9.:]+}",
+                    "partition": "R{[0-9]+}",
+                    "attempt": "R{[0-9]+}",
+                    "partition-send-profile": [
+                        {
+                            "partition-id": {
+                                "job-id": "R{[A-Z0-9.:]+}",
+                                "connector-id": "R{[A-Z0-9.:]+}",
+                                "sender-index": "R{[0-9]+}",
+                                "receiver-index": "R{[0-9]+}"
+                            },
+                            "open-time": "R{[0-9]+}",
+                            "close-time": "R{[0-9]+}",
+                            "offset": "R{[0-9]+}",
+                            "frame-times": [
+                                0
+                            ],
+                            "resolution": 1
+                        }
+                    ],
+                    "counters": [
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        }
+                    ]
+                }
+            ]
+    }]
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
index bfb8c62..e6d1c0a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
@@ -19,6 +19,15 @@
                         {
                             "name": "R{.+}",
                             "run-time": "R{[0-9.]+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": "R{[0-9.]+}",
+                            "avg-tuple-size": "R{[0-9.]+}",
+                            "min-tuple-size": "R{[0-9.]+}",
+                            "max-tuple-size": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}"
                         },
                         {
@@ -34,6 +43,15 @@
                         },
                         {
                             "name": "R{.+}",
+                            "run-time": "R{5.+}",
+                            "runtime-id": "R{.+}",
+                            "cardinality-out": 10,
+                            "avg-tuple-size": 38,
+                            "min-tuple-size": 38,
+                            "max-tuple-size": 38
+                        },
+                        {
+                            "name": "R{.+}",
                             "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}",
                             "cardinality-out": "R{[0-9.]+}",
@@ -43,12 +61,12 @@
                         },
                         {
                           "name": "R{.+}",
-                          "run-time": "R{5.+}",
+                          "run-time": "R{[0-9.]+}",
                           "runtime-id": "R{.+}",
-                          "cardinality-out": 10,
-                          "avg-tuple-size": 25,
-                          "min-tuple-size": 25,
-                          "max-tuple-size": 25
+                          "cardinality-out": "R{[0-9.]+}",
+                          "avg-tuple-size": "R{[0-9.]+}",
+                          "min-tuple-size": "R{[0-9.]+}",
+                          "max-tuple-size": "R{[0-9.]+}"
                         }
                     ]
                 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
index 012133e..e3e7647 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
@@ -80,6 +80,15 @@
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
                     }
                   ]
                 },
@@ -93,10 +102,24 @@
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
-                        "cardinality-out": "R{[0-9.]+}",
-                        "avg-tuple-size": "R{[0-9.]+}",
-                        "min-tuple-size": "R{[0-9.]+}",
-                        "max-tuple-size": "R{[0-9.]+}"
+                        "cardinality-out": 10,
+                        "avg-tuple-size": 140,
+                        "min-tuple-size": 137,
+                        "max-tuple-size": 151
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": 5,
+                        "avg-tuple-size": 145,
+                        "min-tuple-size": 142,
+                        "max-tuple-size": 151
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}"
                     },
                     {
                         "name": "R{.+}",
@@ -111,17 +134,30 @@
                     },
                     {
                         "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": 5,
+                        "avg-tuple-size": 16,
+                        "min-tuple-size": 14,
+                        "max-tuple-size": 19
+                    },
+                    {
+                        "name": "R{.+}",
                         "run-time": "R{5.+}",
                         "runtime-id": "R{.+}",
-                        "cardinality-out": "R{[0-9.]+}",
-                        "avg-tuple-size": "R{[0-9.]+}",
-                        "min-tuple-size": "R{[0-9.]+}",
-                        "max-tuple-size": "R{[0-9.]+}"
+                        "cardinality-out": 5,
+                        "avg-tuple-size": 161,
+                        "min-tuple-size": 156,
+                        "max-tuple-size": 170
                     },
                     {
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
-                        "runtime-id": "R{.+}"
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": 1,
+                        "avg-tuple-size": 0,
+                        "min-tuple-size": 0,
+                        "max-tuple-size": 0
                     }
                   ]
                 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
index b024312..98d7930 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
@@ -58,7 +58,16 @@
                   "attempt": "R{[0-9]+}",
                   "partition-send-profile": [],
                   "counters": [
-                  {
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{5.+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": 3,
+                        "avg-tuple-size": 75,
+                        "min-tuple-size": 67,
+                        "max-tuple-size": 82
+                    },
+                    {
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
@@ -69,7 +78,25 @@
                     },
                     {
                         "name": "R{.+}",
-                        "run-time": "R{5.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
                         "cardinality-out": "R{[0-9.]+}",
                         "avg-tuple-size": "R{[0-9.]+}",
@@ -153,6 +180,33 @@
                         "name": "R{.+}",
                         "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
+                        "cardinality-out": "R{[0-9.]+}",
+                        "avg-tuple-size": "R{[0-9.]+}",
+                        "min-tuple-size": "R{[0-9.]+}",
+                        "max-tuple-size": "R{[0-9.]+}"
+                    },
+                    {
+                        "name": "R{.+}",
+                        "run-time": "R{[0-9.]+}",
+                        "runtime-id": "R{.+}",
                         "pages-read": "R{[0-9.]+}",
                         "pages-read-cold": "R{[0-9.]+}",
                         "cardinality-out": "R{[0-9.]+}",
@@ -162,7 +216,7 @@
                     },
                     {
                         "name": "R{.+}",
-                        "run-time": "R{[0-9].+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
                         "cardinality-out": "R{[0-9.]+}",
                         "avg-tuple-size": "R{[0-9.]+}",
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
index 2347fa5..f22f5da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
@@ -46,6 +46,11 @@
     }
 
     @Override
+    public String toString() {
+        return "ClosedRecordConstructor";
+    }
+
+    @Override
     public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
         int n = args.length / 2;
         IScalarEvaluator[] evalFields = new IScalarEvaluator[n];
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index f712bdd..6c7f2f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -19,6 +19,8 @@
 
 package org.apache.asterix.runtime.operators;
 
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -85,7 +87,7 @@
                 writer.open();
                 IStatsCollector coll = ctx.getStatsCollector();
                 if (coll != null) {
-                    coll.add(new OperatorStats(operatorName));
+                    coll.add(new OperatorStats(operatorName, INVALID_ODID));
                 }
                 INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
                 indexesStats = new HashMap<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 85910aa..a44e1be 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -47,6 +48,7 @@
 import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder {
@@ -172,6 +174,17 @@
         }
 
         @Override
+        public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory,
+                EnumSet<JobFlag> runtimeFlags) throws AlgebricksException {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+            PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+            if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+                pc.enableLog2PhysMapping();
+            }
+            return pc.compilePlan(plan, jobEventListenerFactory);
+        }
+
+        @Override
         public boolean skipJobCapacityAssignment() {
             return oc.skipJobCapacityAssignment();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
index 0f5798e..7de0adb 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import java.util.EnumSet;
+
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public interface ICompiler {
@@ -28,5 +31,8 @@
     public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
             throws AlgebricksException;
 
+    JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobletEventListenerFactory,
+            EnumSet<JobFlag> runtimeFlags) throws AlgebricksException;
+
     boolean skipJobCapacityAssignment();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 3e4e09f..d1a356c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -25,6 +25,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -80,7 +81,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
-import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 
 import com.fasterxml.jackson.core.JsonFactory;
@@ -105,7 +106,7 @@
     private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
     private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
     private Map<Object, String> log2odid = Collections.emptyMap();
-    private Map<String, ProfileInfo> profile = Collections.emptyMap();
+    private Map<String, OperatorProfile> profile = Collections.emptyMap();
     private final IdCounter idCounter = new IdCounter();
     private final JsonGenerator jsonGenerator;
 
@@ -153,15 +154,88 @@
         }
     }
 
-    private class ProfileInfo {
-        Map<Integer, Pair<Double, Double>> activities;
+    private class ExtendedActivityId {
+        private final OperatorDescriptorId odId;
+        private final int id;
+        private final int microId;
+        private final int subPipe;
+        private final int subId;
 
-        ProfileInfo() {
+        ExtendedActivityId(String str) {
+            if (str.startsWith("ANID:")) {
+                str = str.substring(5);
+                int idIdx = str.lastIndexOf(':');
+                this.odId = OperatorDescriptorId.parse(str.substring(0, idIdx));
+                String[] parts = str.substring(idIdx + 1).split("\\.");
+                this.id = Integer.parseInt(parts[0]);
+                if (parts.length >= 2) {
+                    this.microId = Integer.parseInt(parts[1]);
+                } else {
+                    this.microId = -1;
+                }
+                if (parts.length >= 4) {
+                    this.subPipe = Integer.parseInt(parts[2]);
+                    this.subId = Integer.parseInt(parts[3]);
+                } else {
+                    this.subPipe = -1;
+                    this.subId = -1;
+                }
+            } else {
+                throw new IllegalArgumentException("Unable to parse: " + str);
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(values());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            return (o instanceof ExtendedActivityId) && Objects.equals(((ExtendedActivityId) o).values(), values());
+        }
+
+        private List<?> values() {
+            return List.of(odId, id, microId, subPipe, subId);
+        }
+
+        @Override
+        public String toString() {
+            return "ANID:" + odId + ":" + getLocalId();
+        }
+
+        private void catenateId(StringBuilder sb, int i) {
+            if (sb.length() == 0) {
+                sb.append(i);
+                return;
+            }
+            sb.append(".");
+            sb.append(i);
+        }
+
+        public String getLocalId() {
+            StringBuilder sb = new StringBuilder();
+            catenateId(sb, odId.getId());
+            if (microId > 0) {
+                catenateId(sb, microId);
+            }
+            if (subId > 0) {
+                catenateId(sb, subPipe);
+                catenateId(sb, subId);
+            }
+            return sb.toString();
+        }
+    }
+
+    private class OperatorProfile {
+        Map<String, Pair<Double, Double>> activities;
+
+        OperatorProfile() {
             activities = new HashMap<>();
         }
 
-        void visit(int id, double time) {
-            Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+        void updateOperator(String extendedOpId, double time) {
+            Pair<Double, Double> times = activities.computeIfAbsent(extendedOpId, i -> new Pair(time, time));
             if (times.getFirst() > time) {
                 times.setFirst(time);
             }
@@ -171,26 +245,27 @@
         }
     }
 
-    private static ActivityId acIdFromName(String name) {
+    private ExtendedActivityId acIdFromName(String name) {
         String[] parts = name.split(" - ");
-        return ActivityId.parse(parts[0]);
+        return new ExtendedActivityId(parts[0]);
     }
 
-    Map<String, ProfileInfo> processProfile(ObjectNode profile) {
-        Map<String, ProfileInfo> profiledOps = new HashMap<>();
+    Map<String, OperatorProfile> processProfile(ObjectNode profile) {
+        Map<String, OperatorProfile> profiledOps = new HashMap<>();
         for (JsonNode joblet : profile.get("joblets")) {
             for (JsonNode task : joblet.get("tasks")) {
                 for (JsonNode counters : task.get("counters")) {
-                    ProfileInfo info =
-                            profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
-                    info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+                    OperatorProfile info = profiledOps.computeIfAbsent(counters.get("runtime-id").asText(),
+                            i -> new OperatorProfile());
+                    info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
                             counters.get("run-time").asDouble());
                 }
                 for (JsonNode partition : task.get("partition-send-profile")) {
                     String id = partition.get("partition-id").get("connector-id").asText();
-                    ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+                    OperatorProfile info = profiledOps.computeIfAbsent(id, i -> new OperatorProfile());
                     //CDIDs are unique
-                    info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+                    info.updateOperator("0",
+                            partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
                 }
             }
         }
@@ -266,15 +341,16 @@
             String od = log2odid.get(op);
             if (od != null) {
                 jsonGenerator.writeStringField("runtime-id", od);
-                ProfileInfo info = profile.get(od);
+                OperatorProfile info = profile.get(od);
                 if (info != null) {
                     if (info.activities.size() == 1) {
-                        jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
-                        jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+                        Pair<Double, Double> minMax = info.activities.values().iterator().next();
+                        jsonGenerator.writeNumberField("min-time", minMax.first);
+                        jsonGenerator.writeNumberField("max-time", minMax.second);
                     } else {
                         jsonGenerator.writeObjectFieldStart("times");
-                        for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
-                            jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+                        for (Map.Entry<String, Pair<Double, Double>> ac : info.activities.entrySet()) {
+                            jsonGenerator.writeObjectFieldStart(ac.getKey());
                             jsonGenerator.writeNumberField("min-time", ac.getValue().first);
                             jsonGenerator.writeNumberField("max-time", ac.getValue().second);
                             jsonGenerator.writeEndObject();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index d8bf190..ca26515 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -38,6 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -72,6 +74,8 @@
 
     private int aodCounter = 0;
 
+    private boolean genLog2PhysMap = false;
+
     public JobBuilder(JobSpecification jobSpec, AlgebricksAbsolutePartitionConstraint clusterLocations) {
         this.jobSpec = jobSpec;
         this.clusterLocations = clusterLocations;
@@ -94,6 +98,10 @@
                 new String[] { clusterLocations.getLocations()[Math.abs(jobSpec.hashCode() % nPartitions)] });
     }
 
+    public void enableLog2PhysMapping() {
+        this.genLog2PhysMap = true;
+    }
+
     @Override
     public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) {
         contributeMicroOperator(op, runtime, recDesc, null);
@@ -148,11 +156,39 @@
         hyracksOps.put(op, opDesc);
     }
 
+    private String getExtendedOdidForMetaOp(ILogicalOperator op, int k) {
+        String base = metaAsterixOps.get(k).getOperatorId().toString();
+        Pair<IPushRuntimeFactory, RecordDescriptor> fact = microOps.get(op);
+        List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpPipeline = metaAsterixOpSkeletons.get(k);
+        int pos = metaOpPipeline.indexOf(fact);
+        return base + "." + pos;
+    }
+
+    private void getExtendedOdidForSubplanOp(SubplanOperator op, Map<ILogicalOperator, String> log2phys) {
+        String baseId = getExtendedOdidForMetaOp(op, algebraicOpBelongingToMetaAsterixOp.get(op));
+        op.getNestedPlans().forEach(plan -> plan.getRoots()
+                .forEach(root -> getExtendedOdidForOperator(baseId, root.getValue(), log2phys, 0)));
+    }
+
+    private int getExtendedOdidForOperator(String baseId, ILogicalOperator op, Map<ILogicalOperator, String> log2phys,
+            int input) {
+        List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+        List<Integer> paths = new ArrayList<>(inputs.size());
+        for (int i = 0; i < inputs.size(); i++) {
+            ILogicalOperator nextOp = inputs.get(i).getValue();
+            paths.add(i, getExtendedOdidForOperator(baseId, nextOp, log2phys, i + input));
+        }
+        int lPath = paths.size() > 0 ? Collections.max(paths) : 0;
+        log2phys.put(op, baseId + "." + input + "." + lPath);
+        return lPath + 1;
+    }
+
     public Map<Object, String> getLogical2PhysicalMap() {
         Map<ILogicalOperator, String> mergedOperatorMap = new HashMap<>();
         hyracksOps.forEach(((k, v) -> mergedOperatorMap.put(k, v.getOperatorId().toString())));
-        algebraicOpBelongingToMetaAsterixOp
-                .forEach((k, v) -> mergedOperatorMap.put(k, metaAsterixOps.get(v).getOperatorId().toString()));
+        algebraicOpBelongingToMetaAsterixOp.forEach((k, v) -> mergedOperatorMap.put(k, getExtendedOdidForMetaOp(k, v)));
+        microOps.keySet().stream().filter(op -> op instanceof SubplanOperator)
+                .forEach(op -> getExtendedOdidForSubplanOp((SubplanOperator) op, mergedOperatorMap));
         connectors.forEach((k, v) -> mergedOperatorMap.put(k, v.getFirst().getConnectorId().toString()));
         return Collections.unmodifiableMap(mergedOperatorMap);
     }
@@ -184,7 +220,11 @@
             jobSpec.addRoot(opDesc);
         }
         setAllPartitionConstraints(tgtConstraints);
-        jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+        if (genLog2PhysMap) {
+            jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+        } else {
+            jobSpec.setLogical2PhysicalMap(Collections.emptyMap());
+        }
     }
 
     public List<IOperatorDescriptor> getGeneratedMetaOps() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 2ef5c6c..f8bdfa5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -39,6 +39,8 @@
     private JobGenContext context;
     private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<>();
 
+    boolean genLog2PhysMap = false;
+
     public PlanCompiler(JobGenContext context) {
         this.context = context;
     }
@@ -47,6 +49,10 @@
         return context;
     }
 
+    public void enableLog2PhysMapping() {
+        this.genLog2PhysMap = true;
+    }
+
     public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory)
             throws AlgebricksException {
         return compilePlanImpl(plan, false, null, jobEventListenerFactory);
@@ -66,6 +72,9 @@
         }
         List<ILogicalOperator> rootOps = new ArrayList<>();
         JobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
+        if (genLog2PhysMap) {
+            builder.enableLog2PhysMapping();
+        }
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             compileOpRef(opRef, spec, builder, outerPlanSchema);
             rootOps.add(opRef.getValue());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 82b6f9c..b1bd46b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public interface IPushRuntimeFactory extends Serializable {
+
     IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException;
 
     default void setSourceLocation(SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
new file mode 100644
index 0000000..354f172
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime {
+
+    private final IPushRuntime wrapped;
+    private final IOperatorStats stats;
+
+    private final boolean last;
+
+    private final Map<Integer, ITimedWriter> outputs;
+
+    public ProfiledPushRuntime(IPushRuntime push, IOperatorStats stats, boolean last) {
+        super(push);
+        outputs = new HashMap<>();
+        this.wrapped = push;
+        this.stats = stats;
+        this.last = last;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        super.close();
+        long ownTime = getTotalTime();
+        //for micro union all. accumulate the time of each input into the counter.
+        //then, on input 0, subtract the output from the accumulated time.
+        if (!last) {
+            stats.getTimeCounter().update(ownTime);
+            return;
+        }
+        ownTime += stats.getTimeCounter().get();
+        for (ITimedWriter w : outputs.values()) {
+            ownTime -= w.getTotalTime();
+        }
+        stats.getTimeCounter().set(ownTime);
+    }
+
+    public IOperatorStats getStats() {
+        return stats;
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        if (writer instanceof ITimedWriter) {
+            ITimedWriter wrapper = (ITimedWriter) writer;
+            wrapper.setUpstreamStats(stats);
+            outputs.put(index, wrapper);
+        }
+        wrapped.setOutputFrameWriter(index, writer, recordDesc);
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        wrapped.setInputRecordDescriptor(index, recordDescriptor);
+    }
+
+    public static IPushRuntime time(IPushRuntime push, IOperatorStats stats, boolean last) throws HyracksDataException {
+        if (!(push instanceof ProfiledPushRuntime)) {
+            return new ProfiledPushRuntime(push, stats, last);
+        } else {
+            return push;
+        }
+    }
+
+    public static IPushRuntime time(IPushRuntime push, IOperatorStats stats) throws HyracksDataException {
+        return time(push, stats, true);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b3fef7f..2a3fa7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -19,26 +19,34 @@
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 3L;
 
     // array of factories for building the local runtime pipeline
     private final AlgebricksPipeline pipeline;
@@ -85,6 +93,68 @@
         }
     }
 
+    private static String makeStatName(String base, String name, int pos, int input, int subPlan, int subPos) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(base);
+        sb.append(".");
+        sb.append(pos);
+        if (subPlan >= 0) {
+            sb.append(".");
+            sb.append(subPlan);
+            sb.append(".");
+            sb.append(subPos);
+            sb.append(" - Subplan ");
+        } else {
+            sb.append(" - MicroOp ");
+        }
+        sb.append(name);
+        if (input >= 0) {
+            sb.append(" input [");
+            sb.append(input);
+            sb.append("] ");
+        }
+        return sb.toString();
+    }
+
+    private static String makeId(String base, int id, int subPlan, int subPos) {
+        return base + "." + id + (subPlan >= 0 ? "." + subPlan : "") + (subPos >= 0 ? "." + subPos : "");
+    }
+
+    private static IOperatorStats makeStatForRuntimeFact(IPushRuntimeFactory factory, String base, String baseId,
+            int pos, int subPlan, int subPos) {
+        return new OperatorStats(makeStatName(base, factory.toString(), pos, -1, subPlan, subPos),
+                makeId(baseId, pos, subPlan, subPos));
+    }
+
+    public static Map<IPushRuntimeFactory, IOperatorStats> makeMicroOpStats(AlgebricksPipeline pipe,
+            IOperatorStats outerStats) {
+        Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
+        String baseName = outerStats.getName().split(" - ")[0];
+        String baseId = outerStats.getOperatorId();
+        List<SubplanRuntimeFactory> subplans = new ArrayList<>();
+        for (int i = 0; i < pipe.getRuntimeFactories().length; i++) {
+            IPushRuntimeFactory fact = pipe.getRuntimeFactories()[i];
+            //TODO: don't use instanceof
+            if (fact instanceof SubplanRuntimeFactory) {
+                SubplanRuntimeFactory subplanFact = (SubplanRuntimeFactory) fact;
+                subplans.add(subplanFact);
+                List<AlgebricksPipeline> pipelines = subplanFact.getPipelines();
+                for (AlgebricksPipeline p : pipelines) {
+                    IPushRuntimeFactory[] subplanFactories = p.getRuntimeFactories();
+                    for (int j = subplanFactories.length - 1; j > 0; j--) {
+                        microOpStats.put(subplanFactories[j], makeStatForRuntimeFact(subplanFactories[j], baseName,
+                                baseId, i, pipelines.indexOf(p), j));
+                    }
+                }
+            }
+            microOpStats.put(fact, makeStatForRuntimeFact(fact, baseName, baseId, i, -1, -1));
+        }
+        for (SubplanRuntimeFactory sub : subplans) {
+            sub.setStats(microOpStats);
+        }
+        return microOpStats;
+    }
+
     private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable {
         private final IHyracksTaskContext ctx;
 
@@ -99,7 +169,7 @@
                     outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
             PipelineAssembler pa =
                     new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
-            startOfPipeline = pa.assemblePipeline(writer, ctx);
+            startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
             HyracksDataException exception = null;
             try {
                 startOfPipeline.open();
@@ -126,16 +196,18 @@
         public String getDisplayName() {
             return "Empty Tuple Source";
         }
+
     }
 
     private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider) {
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+        return new AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable() {
 
             private IFrameWriter startOfPipeline;
             private boolean opened = false;
+            private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
+            private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
 
-            @Override
             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
                     RecordDescriptor pipelineOutputRecordDescriptor =
@@ -144,7 +216,7 @@
                             .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
                     PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
-                    startOfPipeline = pa.assemblePipeline(writer, ctx);
+                    startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats);
                 }
                 opened = true;
                 startOfPipeline.open();
@@ -175,9 +247,38 @@
             }
 
             @Override
+            public void deinitialize() throws HyracksDataException {
+                super.deinitialize();
+            }
+
+            @Override
             public String toString() {
                 return AlgebricksMetaOperatorDescriptor.this.toString();
             }
+
+            @Override
+            public void addStats(IOperatorStats stats) throws HyracksDataException {
+                microOpStats = makeMicroOpStats(pipeline, stats);
+                for (IOperatorStats stat : microOpStats.values()) {
+                    ctx.getStatsCollector().add(stat);
+                }
+            }
+
+            @Override
+            public void setUpstreamStats(IOperatorStats stats) {
+                parentStats = stats;
+            }
+
+            @Override
+            public long getTotalTime() {
+                return startOfPipeline instanceof ITimedWriter ? ((ITimedWriter) startOfPipeline).getTotalTime() : 0;
+            }
+
+            @Override
+            public IOperatorStats getStats() {
+                IPushRuntimeFactory[] facts = pipeline.getRuntimeFactories();
+                return microOpStats.getOrDefault(facts[facts.length - 1], NoOpOperatorStats.INSTANCE);
+            }
         };
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 858fcfa..202c087 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -25,6 +25,8 @@
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -32,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
 
 public class PipelineAssembler {
 
@@ -55,6 +58,11 @@
     }
 
     public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+        return assemblePipeline(writer, ctx, new HashMap<>());
+    }
+
+    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx,
+            Map<IPushRuntimeFactory, IOperatorStats> microOpStats) throws HyracksDataException {
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
@@ -67,7 +75,21 @@
             IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
             IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
             for (int j = 0; j < newRuntimes.length; j++) {
-                if (enforce) {
+                //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline
+                //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow
+                boolean shouldProfile = profile && !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory)
+                        && microOpStats.containsKey(runtimeFactory);
+                if (shouldProfile) {
+                    ProfiledPushRuntime profiled;
+                    if (j == 0) {
+                        profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+                                microOpStats.get(runtimeFactory));
+                    } else {
+                        profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+                                microOpStats.get(runtimeFactory), false);
+                    }
+                    newRuntimes[j] = profiled;
+                } else if (enforce && !profile) {
                     newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]);
                 }
                 if (i == runtimeFactories.length - 1) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 349e50f..7feca3c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,7 +20,9 @@
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
@@ -32,16 +34,19 @@
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 3L;
 
     private final List<AlgebricksPipeline> pipelines;
 
@@ -51,6 +56,8 @@
 
     private final IMissingWriterFactory[] missingWriterFactories;
 
+    private final Map<IPushRuntimeFactory, IOperatorStats> stats;
+
     public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories,
             RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) {
         super(projectionList);
@@ -61,6 +68,7 @@
         if (projectionList != null) {
             throw new NotImplementedException();
         }
+        this.stats = new HashMap<>();
     }
 
     @Override
@@ -78,6 +86,14 @@
         return sb.toString();
     }
 
+    public List<AlgebricksPipeline> getPipelines() {
+        return pipelines;
+    }
+
+    public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
+        this.stats.putAll(stats);
+    }
+
     @Override
     public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
@@ -92,8 +108,11 @@
 
         boolean first;
 
+        boolean profile;
+
         SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
             this.ctx = ctx;
+            this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
 
             IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length];
@@ -114,6 +133,11 @@
                 if (i == 0) {
                     // primary pipeline
                     outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+                    //this is necessary to track the output of the last operator to the outer product,
+                    //i.e. the last real operator in pipeline 0 of the subplan
+                    if (profile) {
+                        outputWriter = new ProfiledFrameWriter(outputWriter);
+                    }
                     outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc;
                 } else {
                     // secondary pipeline
@@ -127,7 +151,8 @@
                 }
 
                 PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
-                startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx);
+                IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
+                startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
new file mode 100644
index 0000000..7402179
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ISelfProfilingNodePushable extends IStatsContainingNodePushable {
+    void addStats(IOperatorStats stats) throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
new file mode 100644
index 0000000..96a3ae9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface IStatsContainingNodePushable {
+    IOperatorStats getStats();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
new file mode 100644
index 0000000..7b0f8c8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ITimedWriter extends IFrameWriter {
+    void setUpstreamStats(IOperatorStats stats);
+
+    long getTotalTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index dc53bca..cfb0e7b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.dataflow;
 
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.com.job.profiling.counters.Counter;
@@ -28,30 +30,33 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.util.HyracksRunnable;
 import org.apache.hyracks.api.util.HyracksThrowingConsumer;
 import org.apache.hyracks.util.IntSerDeUtils;
 
-public class ProfiledFrameWriter implements IFrameWriter {
+public class ProfiledFrameWriter implements ITimedWriter {
 
     // The downstream data consumer of this writer.
     private final IFrameWriter writer;
-    private final ICounter tupleCounter;
-    private final IOperatorStats parentStats;
+    protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE;
     private int minSz = Integer.MAX_VALUE;
     private int maxSz = -1;
     private long avgSz;
     private ICounter totalTime;
 
-    public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) {
+    public ProfiledFrameWriter(IFrameWriter writer) {
         this.writer = writer;
-        this.parentStats = parentStats;
-        this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
         this.totalTime = new Counter("totalTime");
     }
 
+    @Override
+    public void setUpstreamStats(IOperatorStats stats) {
+        this.upstreamStats = stats;
+    }
+
     public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
         long nt = 0;
         try {
@@ -80,25 +85,24 @@
     private void updateTupleStats(ByteBuffer buffer) {
         int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
         int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
-        if (tupleCounter != null) {
-            long prevCount = tupleCounter.get();
-            for (int i = 0; i < tupleCount; i++) {
-                int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
-                if (maxSz < tupleLen) {
-                    maxSz = tupleLen;
-                }
-                if (minSz > tupleLen) {
-                    minSz = tupleLen;
-                }
-                long prev = avgSz * prevCount;
-                avgSz = (prev + tupleLen) / (prevCount + 1);
-                prevCount++;
+        ICounter tupleCounter = upstreamStats.getTupleCounter();
+        long prevCount = tupleCounter.get();
+        for (int i = 0; i < tupleCount; i++) {
+            int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+            if (maxSz < tupleLen) {
+                maxSz = tupleLen;
             }
-            parentStats.getMaxTupleSz().set(maxSz);
-            parentStats.getMinTupleSz().set(minSz);
-            parentStats.getAverageTupleSz().set(avgSz);
-            tupleCounter.update(tupleCount);
+            if (minSz > tupleLen) {
+                minSz = tupleLen;
+            }
+            long prev = avgSz * prevCount;
+            avgSz = (prev + tupleLen) / (prevCount + 1);
+            prevCount++;
         }
+        upstreamStats.getMaxTupleSz().set(maxSz);
+        upstreamStats.getMinTupleSz().set(minSz);
+        upstreamStats.getAverageTupleSz().set(avgSz);
+        tupleCounter.update(tupleCount);
     }
 
     @Override
@@ -140,14 +144,15 @@
             throws HyracksDataException {
         if (!(writer instanceof ProfiledFrameWriter)) {
             IStatsCollector statsCollector = ctx.getStatsCollector();
-            IOperatorStats stats = new OperatorStats(name);
+            IOperatorStats stats = new OperatorStats(name, INVALID_ODID);
             statsCollector.add(stats);
-            return new ProfiledFrameWriter(writer, null);
-
-        } else
+            return new ProfiledFrameWriter(writer);
+        } else {
             return writer;
+        }
     }
 
+    @Override
     public long getTotalTime() {
         return totalTime.get();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index bde5611..1984d8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -32,20 +32,16 @@
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
 
-public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable, IStatsContainingNodePushable {
 
     private final IOperatorNodePushable op;
-    private final Map<Integer, ProfiledFrameWriter> inputs;
-    private final Map<Integer, ProfiledOperatorNodePushable> parents;
-    private final Map<Integer, ProfiledFrameWriter> outputs;
+    private final Map<Integer, ITimedWriter> inputs;
+    private final Map<Integer, ITimedWriter> outputs;
     private final IOperatorStats stats;
     private final ICounter totalTime;
 
-    ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats,
-            ProfiledOperatorNodePushable parentOp) {
+    ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats) {
         this.stats = stats;
-        this.parents = new HashMap<>();
-        parents.put(0, parentOp);
         this.op = op;
         inputs = new HashMap<>();
         outputs = new HashMap<>();
@@ -60,10 +56,10 @@
     @Override
     public void deinitialize() throws HyracksDataException {
         long ownTime = totalTime.get();
-        for (ProfiledFrameWriter i : inputs.values()) {
+        for (ITimedWriter i : inputs.values()) {
             ownTime += i.getTotalTime();
         }
-        for (ProfiledFrameWriter w : outputs.values()) {
+        for (ITimedWriter w : outputs.values()) {
             ownTime -= w.getTotalTime();
         }
         op.deinitialize();
@@ -78,8 +74,13 @@
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
             throws HyracksDataException {
-        if (writer instanceof ProfiledFrameWriter) {
-            ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+        if (writer instanceof ITimedWriter) {
+            ITimedWriter wrapper = (ITimedWriter) writer;
+            if (op instanceof ISelfProfilingNodePushable) {
+                wrapper.setUpstreamStats(((ISelfProfilingNodePushable) op).getStats());
+            } else {
+                wrapper.setUpstreamStats(stats);
+            }
             outputs.put(index, wrapper);
         }
         op.setOutputFrameWriter(index, writer, recordDesc);
@@ -88,8 +89,7 @@
     @Override
     public IFrameWriter getInputFrameWriter(int index) {
         if (inputs.get(index) == null) {
-            IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
-            ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats);
+            ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index));
             inputs.put(index, pfw);
             return pfw;
         } else {
@@ -102,25 +102,26 @@
         return op.getDisplayName();
     }
 
-    public void addParent(int index, ProfiledOperatorNodePushable parent) {
-        parents.put(index, parent);
-    }
-
     public IOperatorStats getStats() {
         return stats;
     }
 
-    public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
-            ProfiledOperatorNodePushable source) throws HyracksDataException {
+    public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId)
+            throws HyracksDataException {
         String name = acId.toString() + " - " + op.getDisplayName();
         IStatsCollector statsCollector = ctx.getStatsCollector();
-        IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
-        statsCollector.add(stats);
+        IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
+        if (!(op instanceof ISelfProfilingNodePushable)) {
+            statsCollector.add(stats);
+        }
         if (op instanceof IIntrospectingOperator) {
             ((IIntrospectingOperator) op).setOperatorStats(stats);
         }
+        if (op instanceof ISelfProfilingNodePushable) {
+            ((ISelfProfilingNodePushable) op).addStats(stats);
+        }
         if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
-            return new ProfiledOperatorNodePushable(op, stats, source);
+            return new ProfiledOperatorNodePushable(op, stats);
         }
         return op;
     }
@@ -129,7 +130,7 @@
             throws HyracksDataException {
         String name = acId.toString() + " - " + op.getDisplayName();
         IStatsCollector statsCollector = ctx.getStatsCollector();
-        IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
+        IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
         if (op instanceof IIntrospectingOperator) {
             ((IIntrospectingOperator) op).setOperatorStats(stats);
             statsCollector.add(stats);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 26b3a58..7770c4f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -18,10 +18,11 @@
  */
 package org.apache.hyracks.api.job.profiling;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.io.IWritable;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 
@@ -86,11 +87,22 @@
 
     ICounter getBytesWritten();
 
-    OperatorDescriptorId getId();
+    String getOperatorId();
 
     void updateIndexesStats(Map<String, IndexStats> indexesStats);
 
     Map<String, IndexStats> getIndexesStats();
 
     void updateFrom(IOperatorStats stats);
+
+    static IOperatorStats create(DataInput input) throws IOException {
+        String name = input.readUTF();
+        if (NoOpOperatorStats.NOOP_NAME.equals(name)) {
+            return NoOpOperatorStats.INSTANCE;
+        }
+        String operatorId = input.readUTF();
+        OperatorStats operatorStats = new OperatorStats(name, operatorId);
+        operatorStats.readFields(input);
+        return operatorStats;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
index 4c86a15..d427d14 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
@@ -24,14 +24,15 @@
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 
 public class NoOpOperatorStats implements IOperatorStats {
 
-    private static final long serialVersionUID = 9055940822300360135L;
+    private static final long serialVersionUID = 9055940222300360256L;
 
     public static final NoOpOperatorStats INSTANCE = new NoOpOperatorStats();
+    public static final String INVALID_ODID = "ODID:-1";
+    public static final String NOOP_NAME = "NoOp";
 
     private static final ICounter NOOP_COUNTER = new ICounter() {
         private static final long serialVersionUID = 1L;
@@ -57,21 +58,20 @@
         }
     };
 
-    private static final OperatorDescriptorId INVALID_ODID = new OperatorDescriptorId(-1);
-
     @Override
     public void writeFields(DataOutput output) throws IOException {
-
+        output.writeUTF(NOOP_NAME);
+        output.writeUTF(INVALID_ODID);
     }
 
     @Override
     public void readFields(DataInput input) throws IOException {
-
+        // nothing
     }
 
     @Override
     public String getName() {
-        return "";
+        return NOOP_NAME;
     }
 
     @Override
@@ -130,7 +130,7 @@
     }
 
     @Override
-    public OperatorDescriptorId getId() {
+    public String getOperatorId() {
         return INVALID_ODID;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index c9f08e0..412b788 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -25,14 +25,13 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.com.job.profiling.counters.Counter;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 
 public class OperatorStats implements IOperatorStats {
-    private static final long serialVersionUID = 6401830963367567169L;
-
+    private static final long serialVersionUID = 6401830963361567126L;
     public final String operatorName;
-    public OperatorDescriptorId id;
+
+    public final String operatorId;
     public final ICounter tupleCounter;
     public final ICounter timeCounter;
     public final ICounter pageReads;
@@ -48,12 +47,12 @@
 
     //TODO: this is quickly becoming gross it should just be a map where the value is obliged to be a Counter
 
-    public OperatorStats(String operatorName, OperatorDescriptorId id) {
+    public OperatorStats(String operatorName, String operatorId) {
         if (operatorName == null || operatorName.isEmpty()) {
             throw new IllegalArgumentException("operatorName must not be null or empty");
         }
         this.operatorName = operatorName;
-        this.id = id;
+        this.operatorId = operatorId;
         tupleCounter = new Counter("tupleCounter");
         timeCounter = new Counter("timeCounter");
         pageReads = new Counter("diskIoCounter");
@@ -69,17 +68,6 @@
         indexesStats = new HashMap<>();
     }
 
-    public OperatorStats(String operatorName) {
-        this(operatorName, new OperatorDescriptorId(-1));
-    }
-
-    public static IOperatorStats create(DataInput input) throws IOException {
-        String name = input.readUTF();
-        OperatorStats operatorStats = new OperatorStats(name);
-        operatorStats.readFields(input);
-        return operatorStats;
-    }
-
     @Override
     public String getName() {
         return operatorName;
@@ -141,8 +129,8 @@
     }
 
     @Override
-    public OperatorDescriptorId getId() {
-        return id;
+    public String getOperatorId() {
+        return operatorId;
     }
 
     @Override
@@ -178,7 +166,7 @@
     @Override
     public void writeFields(DataOutput output) throws IOException {
         output.writeUTF(operatorName);
-        id.writeFields(output);
+        output.writeUTF(operatorId);
         output.writeLong(tupleCounter.get());
         output.writeLong(timeCounter.get());
         output.writeLong(pageReads.get());
@@ -195,7 +183,6 @@
 
     @Override
     public void readFields(DataInput input) throws IOException {
-        id = OperatorDescriptorId.create(input);
         tupleCounter.set(input.readLong());
         timeCounter.set(input.readLong());
         pageReads.set(input.readLong());
@@ -229,13 +216,13 @@
 
     @Override
     public String toString() {
-        return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
-                + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \""
-                + coldReadCounter.getName() + "\": " + coldReadCounter.get() + avgTupleSz.getName() + "\": "
-                + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + minTupleSz.get() + ", \""
-                + minTupleSz.getName() + "\": " + timeCounter.get() + ", \"" + inputTupleCounter.getName() + "\": "
-                + bytesRead.get() + ", \"" + bytesRead.getName() + "\": " + bytesWritten.get() + ", \""
-                + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \"" + level.getName() + "\": "
-                + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
+        return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"id\": \"" + operatorId + "\", " + "\""
+                + tupleCounter.getName() + "\": " + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": "
+                + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get()
+                + avgTupleSz.getName() + "\": " + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": "
+                + minTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + timeCounter.get() + ", \""
+                + inputTupleCounter.getName() + "\": " + bytesRead.get() + ", \"" + bytesRead.getName() + "\": "
+                + bytesWritten.get() + ", \"" + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \""
+                + level.getName() + "\": " + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4261c1e..17f5cb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -109,11 +109,11 @@
          */
         Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>();
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = null;
+            IOperatorNodePushable opPushable;
             if (profile) {
                 IOperatorNodePushable wrapped =
                         entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-                opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey(), null);
+                opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey());
             } else {
                 opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
                 ProfiledOperatorNodePushable.onlyAddStats(opPushable, ctx, entry.getKey());
@@ -147,12 +147,7 @@
                 if (profile) {
                     IOperatorNodePushable wrapped = channel.getRight().getLeft().createPushRuntime(ctx,
                             recordDescProvider, partition, nPartitions);
-                    if (sourceOp instanceof ProfiledOperatorNodePushable) {
-                        destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId,
-                                (ProfiledOperatorNodePushable) sourceOp);
-                    } else {
-                        destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId, null);
-                    }
+                    destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId);
                 } else {
                     destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
                             nPartitions);
@@ -160,12 +155,6 @@
                 }
                 operatorNodePushablesBFSOrder.add(destOp);
                 operatorNodePushables.put(destId, destOp);
-            } else if (profile) {
-                if (destOp instanceof ProfiledOperatorNodePushable
-                        && sourceOp instanceof ProfiledOperatorNodePushable) {
-                    ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
-                            (ProfiledOperatorNodePushable) sourceOp);
-                }
             }
 
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 26c664a..7cfbda0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.common.job.profiling;
 
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -31,7 +33,7 @@
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 
 public class StatsCollector implements IStatsCollector {
-    private static final long serialVersionUID = 6858817639895434574L;
+    private static final long serialVersionUID = 6858817639895434379L;
 
     private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
 
@@ -61,7 +63,7 @@
 
     @Override
     public IOperatorStats getAggregatedStats() {
-        IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+        IOperatorStats aggregatedStats = new OperatorStats("aggregated", INVALID_ODID);
         for (IOperatorStats stats : operatorStatsMap.values()) {
             aggregatedStats.getInputTupleCounter().update(stats.getInputTupleCounter().get());
             aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get());
@@ -74,8 +76,8 @@
     @Override
     public void writeFields(DataOutput output) throws IOException {
         output.writeInt(operatorStatsMap.size());
-        for (IOperatorStats operatorStats : operatorStatsMap.values()) {
-            operatorStats.writeFields(output);
+        for (IOperatorStats stats : operatorStatsMap.values()) {
+            stats.writeFields(output);
         }
     }
 
@@ -83,7 +85,7 @@
     public void readFields(DataInput input) throws IOException {
         int operatorCount = input.readInt();
         for (int i = 0; i < operatorCount; i++) {
-            IOperatorStats opStats = OperatorStats.create(input);
+            IOperatorStats opStats = IOperatorStats.create(input);
             operatorStatsMap.put(opStats.getName(), opStats);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 871d2bc..40bc1ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.common.job.profiling.om;
 
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -184,7 +186,7 @@
                 }
                 IOperatorStats opOutStats = outStats[i];
                 if (opOutStats == null) {
-                    opOutStats = new OperatorStats(operatorName);
+                    opOutStats = new OperatorStats(operatorName, INVALID_ODID);
                     outStats[i] = opOutStats;
                 }
                 opOutStats.updateFrom(opTaskStats);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 546360a..84376f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.common.job.profiling.om;
 
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -130,8 +132,8 @@
             jpe.put("name", key);
             jpe.put("run-time", Double
                     .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
-            if (value.getId().getId() >= 0) {
-                jpe.put("runtime-id", value.getId().toString());
+            if (!value.getOperatorId().equals(INVALID_ODID)) {
+                jpe.put("runtime-id", value.getOperatorId());
             }
             if (value.getPageReads().get() > 0) {
                 jpe.put("pages-read", value.getPageReads().get());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
new file mode 100644
index 0000000..7f0f5a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.ISelfProfilingNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+
+public abstract class AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable
+        extends AbstractUnaryInputUnaryOutputOperatorNodePushable implements ISelfProfilingNodePushable, ITimedWriter {
+
+}