[ASTERIXDB-3100] Fix profiling for intersect

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

Change the profiler to not use the stack-based time tracking,
and rather use subtracting the total time sequentially from
each upstream operator to determine the time taken for each
individual operator.

Change-Id: Icf2a8f66f39e39eb6a39506c9f385c623176a87c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17319
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
index b056d2c..56169e7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
@@ -57,7 +57,7 @@
 
     @Parameters(name = "SqlppProfiledExecutionTest {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
-        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp_profiled.xml");
+        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
     }
 
     protected TestCaseContext tcCtx;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 0c4fae1..88107b0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -34,12 +34,12 @@
                     "counters": [
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}"
                         },
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}",
                             "pages-read": "R{[0-9.]+}",
                             "pages-read-cold": "R{[0-9.]+}",
@@ -50,7 +50,7 @@
                         },
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}",
+                            "run-time": "R{[0-9.]+}",
                             "runtime-id": "R{.+}",
                             "cardinality-out": "R{[0-9.]+}",
                             "avg-tuple-size": "R{[0-9.]+}",
@@ -67,7 +67,7 @@
                   "counters": [
                     {
                         "name": "R{.+}",
-                        "time": "R{[0-9.]+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}",
                         "cardinality-out": "R{[0-9.]+}",
                         "avg-tuple-size": "R{[0-9.]+}",
@@ -76,7 +76,7 @@
                     },
                     {
                         "name": "R{.+}",
-                        "time": "R{[0-9.]+}",
+                        "run-time": "R{[0-9.]+}",
                         "runtime-id": "R{.+}"
                     }
                   ]
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index 5b9495a..6384e49 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.FrameConstants;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -29,22 +30,25 @@
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.api.util.HyracksConsumer;
+import org.apache.hyracks.api.util.HyracksRunnable;
 import org.apache.hyracks.util.IntSerDeUtils;
 
-public class ProfiledFrameWriter implements IFrameWriter, IPassableTimer {
+public class ProfiledFrameWriter implements IFrameWriter {
 
     // The downstream data consumer of this writer.
     private final IFrameWriter writer;
-    private long frameStart = 0;
     final ICounter timeCounter;
     final ICounter tupleCounter;
     final IStatsCollector collector;
     final IOperatorStats stats;
     final IOperatorStats parentStats;
+
     private int minSz = Integer.MAX_VALUE;
     private int maxSz = -1;
     private long avgSz;
     final String name;
+    public ICounter totalTime;
 
     public ProfiledFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, IOperatorStats stats,
             IOperatorStats parentStats) {
@@ -55,103 +59,77 @@
         this.parentStats = parentStats;
         this.timeCounter = stats.getTimeCounter();
         this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
+        this.totalTime = new Counter("totalTime");
+    }
+
+    public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
+        long nt = 0;
+        try {
+            nt = System.nanoTime();
+            r.run();
+        } finally {
+            c.update(System.nanoTime() - nt);
+        }
+    }
+
+    private void timeMethod(HyracksConsumer<ByteBuffer> c, ByteBuffer buffer) throws HyracksDataException {
+        long nt = 0;
+        try {
+            nt = System.nanoTime();
+            c.accept(buffer);
+        } finally {
+            totalTime.update(System.nanoTime() - nt);
+        }
     }
 
     @Override
     public final void open() throws HyracksDataException {
-        try {
-            startClock();
-            writer.open();
-        } finally {
-            stopClock();
+        timeMethod(writer::open, totalTime);
+    }
+
+    private void updateTupleStats(ByteBuffer buffer) {
+        int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
+        int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
+        if (tupleCounter != null) {
+            long prevCount = tupleCounter.get();
+            for (int i = 0; i < tupleCount; i++) {
+                int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+                if (maxSz < tupleLen) {
+                    maxSz = tupleLen;
+                }
+                if (minSz > tupleLen) {
+                    minSz = tupleLen;
+                }
+                long prev = avgSz * prevCount;
+                avgSz = (prev + tupleLen) / (prevCount + 1);
+                prevCount++;
+            }
+            parentStats.getMaxTupleSz().set(maxSz);
+            parentStats.getMinTupleSz().set(minSz);
+            parentStats.getAverageTupleSz().set(avgSz);
+            tupleCounter.update(tupleCount);
         }
     }
 
     @Override
     public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        try {
-            int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
-            int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
-            if (tupleCounter != null) {
-                long prevCount = tupleCounter.get();
-                for (int i = 0; i < tupleCount; i++) {
-                    int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
-                    if (maxSz < tupleLen) {
-                        maxSz = tupleLen;
-                    }
-                    if (minSz > tupleLen) {
-                        minSz = tupleLen;
-                    }
-                    long prev = avgSz * prevCount;
-                    avgSz = (prev + tupleLen) / (prevCount + 1);
-                    prevCount++;
-                }
-                parentStats.getMaxTupleSz().set(maxSz);
-                parentStats.getMinTupleSz().set(minSz);
-                parentStats.getAverageTupleSz().set(avgSz);
-                tupleCounter.update(tupleCount);
-            }
-            startClock();
-            writer.nextFrame(buffer);
-        } finally {
-            stopClock();
-        }
+        updateTupleStats(buffer);
+        timeMethod(writer::nextFrame, buffer);
     }
 
     @Override
     public final void flush() throws HyracksDataException {
-        try {
-            startClock();
-            writer.flush();
-        } finally {
-            stopClock();
-        }
+        timeMethod(writer::flush, totalTime);
     }
 
     @Override
     public final void fail() throws HyracksDataException {
-        writer.fail();
+        timeMethod(writer::fail, totalTime);
     }
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            startClock();
-            writer.close();
-        } finally {
-            stopClock();
-        }
-    }
-
-    private void stopClock() {
-        pause();
-        collector.giveClock(this);
-    }
-
-    private void startClock() {
-        if (frameStart > 0) {
-            return;
-        }
-        frameStart = collector.takeClock(this);
-    }
-
-    @Override
-    public void resume() {
-        if (frameStart > 0) {
-            return;
-        }
-        long nt = System.nanoTime();
-        frameStart = nt;
-    }
-
-    @Override
-    public void pause() {
-        if (frameStart > 1) {
-            long nt = System.nanoTime();
-            long delta = nt - frameStart;
-            timeCounter.update(delta);
-            frameStart = -1;
-        }
+        timeMethod(writer::close, totalTime);
     }
 
     private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, ByteBuffer buffer) {
@@ -179,4 +157,9 @@
         } else
             return writer;
     }
+
+    public long getTotalTime() {
+        return totalTime.get();
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index f787a1c..facf049 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -19,7 +19,9 @@
 package org.apache.hyracks.api.dataflow;
 
 import java.util.HashMap;
+import java.util.Map;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,43 +29,51 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
 
-public class ProfiledOperatorNodePushable extends ProfiledFrameWriter implements IOperatorNodePushable, IPassableTimer {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
 
     IOperatorNodePushable op;
-    ProfiledOperatorNodePushable parentOp;
     ActivityId acId;
-    HashMap<Integer, IFrameWriter> inputs;
-    long frameStart;
+    Map<Integer, ProfiledFrameWriter> inputs;
+    Map<Integer, ProfiledOperatorNodePushable> parents;
+
+    Map<Integer, ProfiledFrameWriter> outputs;
+    IOperatorStats stats;
+    IStatsCollector collector;
+
+    ICounter totalTime;
 
     ProfiledOperatorNodePushable(IOperatorNodePushable op, ActivityId acId, IStatsCollector collector,
-            IOperatorStats stats, ActivityId parent, ProfiledOperatorNodePushable parentOp)
-            throws HyracksDataException {
-        super(null, collector, acId.toString() + " - " + op.getDisplayName(), stats,
-                parentOp != null ? parentOp.getStats() : null);
-        this.parentOp = parentOp;
+            IOperatorStats stats, ProfiledOperatorNodePushable parentOp) {
+        this.stats = stats;
+        this.collector = collector;
+        this.parents = new HashMap<>();
+        parents.put(0, parentOp);
         this.op = op;
         this.acId = acId;
         inputs = new HashMap<>();
+        outputs = new HashMap<>();
+        this.totalTime = new Counter("totalTime");
     }
 
     @Override
     public void initialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.initialize();
-            stopClock();
-        }
+        ProfiledFrameWriter.timeMethod(op::initialize, totalTime);
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        synchronized (collector) {
-            startClock();
-            op.deinitialize();
-            stopClock();
+        long unNestTime = totalTime.get();
+        for (ProfiledFrameWriter i : inputs.values()) {
+            unNestTime += i.getTotalTime();
         }
+        for (ProfiledFrameWriter w : outputs.values()) {
+            unNestTime -= w.getTotalTime();
+        }
+        op.deinitialize();
+        stats.getTimeCounter().set(unNestTime);
     }
 
     @Override
@@ -74,17 +84,24 @@
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
             throws HyracksDataException {
+        if (writer instanceof ProfiledFrameWriter) {
+            ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+            outputs.put(index, wrapper);
+        }
         op.setOutputFrameWriter(index, writer, recordDesc);
     }
 
     @Override
     public IFrameWriter getInputFrameWriter(int index) {
-        IFrameWriter ifw = op.getInputFrameWriter(index);
-        if (!(op instanceof ProfiledFrameWriter) && ifw.equals(op)) {
-            return new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
+        if (inputs.get(index) == null) {
+            IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
+            ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
                     acId.toString() + "-" + op.getDisplayName(), stats, parentStats);
+            inputs.put(index, pfw);
+            return pfw;
+        } else {
+            return inputs.get(index);
         }
-        return op.getInputFrameWriter(index);
     }
 
     @Override
@@ -92,45 +109,14 @@
         return op.getDisplayName();
     }
 
-    private void stopClock() {
-        pause();
-        collector.giveClock(this);
-    }
-
-    private void startClock() {
-        if (frameStart > 0) {
-            return;
-        }
-        frameStart = collector.takeClock(this);
-    }
-
-    @Override
-    public void resume() {
-        if (frameStart > 0) {
-            return;
-        }
-        long nt = System.nanoTime();
-        frameStart = nt;
-    }
-
-    @Override
-    public void pause() {
-        if (frameStart > 0) {
-            long nt = System.nanoTime();
-            long delta = nt - frameStart;
-            timeCounter.update(delta);
-            frameStart = -1;
-        }
+    public void addParent(int index, ProfiledOperatorNodePushable parent) {
+        parents.put(index, parent);
     }
 
     public IOperatorStats getStats() {
         return stats;
     }
 
-    public IOperatorStats getParentStats() {
-        return parentStats;
-    }
-
     public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
             ProfiledOperatorNodePushable source) throws HyracksDataException {
         String name = acId.toString() + " - " + op.getDisplayName();
@@ -141,7 +127,7 @@
             ((IIntrospectingOperator) op).setOperatorStats(stats);
         }
         if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
-            return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, acId, source);
+            return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, source);
         }
         return op;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index d2ad20c..3169c81 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -21,7 +21,6 @@
 import java.io.Serializable;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
@@ -54,17 +53,4 @@
      */
     IOperatorStats getAggregatedStats();
 
-    /**
-     * Pause an operator's timer, to pass it to another operator
-     * @param newHolder the timer that is starting execution
-     * @return the current nanoTime when the clock was taken from the other operator
-     */
-    long takeClock(IPassableTimer newHolder);
-
-    /**
-     * Resume an operator's timer, when a downstream operator has finished execution of
-     * the method the upstream operator called
-     * @param currHolder the timer that needs to be paused
-     */
-    void giveClock(IPassableTimer currHolder);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index efd4e07..de28318 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -158,6 +158,12 @@
                 }
                 operatorNodePushablesBFSOrder.add(destOp);
                 operatorNodePushables.put(destId, destOp);
+            } else if (profile) {
+                if (destOp instanceof ProfiledOperatorNodePushable
+                        && sourceOp instanceof ProfiledOperatorNodePushable) {
+                    ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
+                            (ProfiledOperatorNodePushable) sourceOp);
+                }
             }
 
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
similarity index 64%
copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
index 6afbccb..c5abb5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
@@ -16,17 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
 
-public interface IPassableTimer {
-    /*
-    A timer intended to be used for timing the individual components of a
-    pipelined process. An instance of IPassableTimer is held by each method
-    in the pipeline, and is paused() when that method passes off control to
-    a component above it, and is resume()d when the component above it returns.
-     */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-    void pause();
-
-    void resume();
+@FunctionalInterface
+public interface HyracksConsumer<T> {
+    void accept(final T elem) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
index 6afbccb..a82c150 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
@@ -16,17 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
 
-public interface IPassableTimer {
-    /*
-    A timer intended to be used for timing the individual components of a
-    pipelined process. An instance of IPassableTimer is held by each method
-    in the pipeline, and is paused() when that method passes off control to
-    a component above it, and is resume()d when the component above it returns.
-     */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-    void pause();
-
-    void resume();
+@FunctionalInterface
+public interface HyracksRunnable {
+    void run() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 41beac0..76c8017 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,23 +21,19 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
 
 public class StatsCollector implements IStatsCollector {
-    private static final long serialVersionUID = 6858817639895434572L;
+    private static final long serialVersionUID = 6858817639895434573L;
 
     private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
-    private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
 
     @Override
     public void add(IOperatorStats operatorStats) {
@@ -91,23 +87,4 @@
         }
     }
 
-    @Override
-    public long takeClock(IPassableTimer newHolder) {
-        if (newHolder != null) {
-            if (clockHolder.peek() != null) {
-                clockHolder.peek().pause();
-            }
-            clockHolder.push(newHolder);
-        }
-        return System.nanoTime();
-    }
-
-    @Override
-    public void giveClock(IPassableTimer currHolder) {
-        clockHolder.removeLastOccurrence(currHolder);
-        if (clockHolder.peek() != null) {
-            clockHolder.peek().resume();
-        }
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 4036f00..546360a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -128,7 +128,7 @@
         opTimes.forEach((key, value) -> {
             ObjectNode jpe = om.createObjectNode();
             jpe.put("name", key);
-            jpe.put("time", Double
+            jpe.put("run-time", Double
                     .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
             if (value.getId().getId() >= 0) {
                 jpe.put("runtime-id", value.getId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 815536b..6e58e9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,14 +30,12 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ProfiledRunGenerator;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
@@ -143,13 +141,11 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
                         recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
                         groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
                         partialAggRecordDesc, ALG);
-                return profile ? ProfiledRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)", this.getActivityId())
-                        : runGen;
+                return runGen;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 67b0686..dc2b46f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -224,6 +224,11 @@
         }
 
         @Override
+        public String getDisplayName() {
+            return "Intersect";
+        }
+
+        @Override
         public IFrameWriter getInputFrameWriter(final int index) {
             return new IFrameWriter() {
                 private final int[] normalizedKey1 =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ebe871b..38320ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 
@@ -79,11 +78,9 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
                         comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
-                return profile ? ProfiledRunGenerator.time(runGen, ctx, "ExternalSort(Sort)", this.getActivityId())
-                        : runGen;
+                return runGen;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
deleted file mode 100644
index 5cc1882..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.api.job.profiling.OperatorStats;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-
-public class ProfiledRunGenerator extends ProfiledFrameWriter implements IRunGenerator {
-
-    private final IRunGenerator runGenerator;
-
-    private ProfiledRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name,
-            IOperatorStats stats, ActivityId root) {
-        super(runGenerator, collector, name, stats, null);
-        this.runGenerator = runGenerator;
-    }
-
-    @Override
-    public List<GeneratedRunFileReader> getRuns() {
-        return runGenerator.getRuns();
-    }
-
-    @Override
-    public ISorter getSorter() {
-        return runGenerator.getSorter();
-    }
-
-    public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name, ActivityId root)
-            throws HyracksDataException {
-        if (!(runGenerator instanceof ProfiledRunGenerator)) {
-            String statName = root.toString() + " - " + name;
-            IStatsCollector statsCollector = ctx.getStatsCollector();
-            IOperatorStats stats = new OperatorStats(statName);
-            statsCollector.add(stats);
-            return new ProfiledRunGenerator(runGenerator, ctx.getStatsCollector(), name, stats, root);
-        }
-        return runGenerator;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 2322910..934ae60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -29,9 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 
 public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -63,16 +61,9 @@
             @Override
             protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) {
-                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
                 IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
                         keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
-                try {
-                    return profile ? ProfiledRunGenerator.time(runGen, ctx, "TopKSort (Sort)", this.getActivityId())
-                            : runGen;
-                } catch (HyracksDataException e) {
-                    e.printStackTrace();
-                }
-                return null;
+                return runGen;
             }
         };
     }