[ASTERIXDB-3100] Fix profiling for intersect
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Change the profiler to not use the stack-based time tracking,
and rather use subtracting the total time sequentially from
each upstream operator to determine the time taken for each
individual operator.
Change-Id: Icf2a8f66f39e39eb6a39506c9f385c623176a87c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17319
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
index b056d2c..56169e7 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppProfiledExecutionTest.java
@@ -57,7 +57,7 @@
@Parameters(name = "SqlppProfiledExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
- return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp_profiled.xml");
+ return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
}
protected TestCaseContext tcCtx;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 0c4fae1..88107b0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -34,12 +34,12 @@
"counters": [
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"pages-read": "R{[0-9.]+}",
"pages-read-cold": "R{[0-9.]+}",
@@ -50,7 +50,7 @@
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -67,7 +67,7 @@
"counters": [
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -76,7 +76,7 @@
},
{
"name": "R{.+}",
- "time": "R{[0-9.]+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
}
]
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index 5b9495a..6384e49 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.FrameConstants;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -29,22 +30,25 @@
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.api.util.HyracksConsumer;
+import org.apache.hyracks.api.util.HyracksRunnable;
import org.apache.hyracks.util.IntSerDeUtils;
-public class ProfiledFrameWriter implements IFrameWriter, IPassableTimer {
+public class ProfiledFrameWriter implements IFrameWriter {
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- private long frameStart = 0;
final ICounter timeCounter;
final ICounter tupleCounter;
final IStatsCollector collector;
final IOperatorStats stats;
final IOperatorStats parentStats;
+
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
final String name;
+ public ICounter totalTime;
public ProfiledFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, IOperatorStats stats,
IOperatorStats parentStats) {
@@ -55,103 +59,77 @@
this.parentStats = parentStats;
this.timeCounter = stats.getTimeCounter();
this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
+ this.totalTime = new Counter("totalTime");
+ }
+
+ public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
+ long nt = 0;
+ try {
+ nt = System.nanoTime();
+ r.run();
+ } finally {
+ c.update(System.nanoTime() - nt);
+ }
+ }
+
+ private void timeMethod(HyracksConsumer<ByteBuffer> c, ByteBuffer buffer) throws HyracksDataException {
+ long nt = 0;
+ try {
+ nt = System.nanoTime();
+ c.accept(buffer);
+ } finally {
+ totalTime.update(System.nanoTime() - nt);
+ }
}
@Override
public final void open() throws HyracksDataException {
- try {
- startClock();
- writer.open();
- } finally {
- stopClock();
+ timeMethod(writer::open, totalTime);
+ }
+
+ private void updateTupleStats(ByteBuffer buffer) {
+ int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
+ int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
+ if (tupleCounter != null) {
+ long prevCount = tupleCounter.get();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+ if (maxSz < tupleLen) {
+ maxSz = tupleLen;
+ }
+ if (minSz > tupleLen) {
+ minSz = tupleLen;
+ }
+ long prev = avgSz * prevCount;
+ avgSz = (prev + tupleLen) / (prevCount + 1);
+ prevCount++;
+ }
+ parentStats.getMaxTupleSz().set(maxSz);
+ parentStats.getMinTupleSz().set(minSz);
+ parentStats.getAverageTupleSz().set(avgSz);
+ tupleCounter.update(tupleCount);
}
}
@Override
public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- try {
- int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
- int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
- if (tupleCounter != null) {
- long prevCount = tupleCounter.get();
- for (int i = 0; i < tupleCount; i++) {
- int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
- if (maxSz < tupleLen) {
- maxSz = tupleLen;
- }
- if (minSz > tupleLen) {
- minSz = tupleLen;
- }
- long prev = avgSz * prevCount;
- avgSz = (prev + tupleLen) / (prevCount + 1);
- prevCount++;
- }
- parentStats.getMaxTupleSz().set(maxSz);
- parentStats.getMinTupleSz().set(minSz);
- parentStats.getAverageTupleSz().set(avgSz);
- tupleCounter.update(tupleCount);
- }
- startClock();
- writer.nextFrame(buffer);
- } finally {
- stopClock();
- }
+ updateTupleStats(buffer);
+ timeMethod(writer::nextFrame, buffer);
}
@Override
public final void flush() throws HyracksDataException {
- try {
- startClock();
- writer.flush();
- } finally {
- stopClock();
- }
+ timeMethod(writer::flush, totalTime);
}
@Override
public final void fail() throws HyracksDataException {
- writer.fail();
+ timeMethod(writer::fail, totalTime);
}
@Override
public void close() throws HyracksDataException {
- try {
- startClock();
- writer.close();
- } finally {
- stopClock();
- }
- }
-
- private void stopClock() {
- pause();
- collector.giveClock(this);
- }
-
- private void startClock() {
- if (frameStart > 0) {
- return;
- }
- frameStart = collector.takeClock(this);
- }
-
- @Override
- public void resume() {
- if (frameStart > 0) {
- return;
- }
- long nt = System.nanoTime();
- frameStart = nt;
- }
-
- @Override
- public void pause() {
- if (frameStart > 1) {
- long nt = System.nanoTime();
- long delta = nt - frameStart;
- timeCounter.update(delta);
- frameStart = -1;
- }
+ timeMethod(writer::close, totalTime);
}
private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, ByteBuffer buffer) {
@@ -179,4 +157,9 @@
} else
return writer;
}
+
+ public long getTotalTime() {
+ return totalTime.get();
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index f787a1c..facf049 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -19,7 +19,9 @@
package org.apache.hyracks.api.dataflow;
import java.util.HashMap;
+import java.util.Map;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,43 +29,51 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
-public class ProfiledOperatorNodePushable extends ProfiledFrameWriter implements IOperatorNodePushable, IPassableTimer {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
IOperatorNodePushable op;
- ProfiledOperatorNodePushable parentOp;
ActivityId acId;
- HashMap<Integer, IFrameWriter> inputs;
- long frameStart;
+ Map<Integer, ProfiledFrameWriter> inputs;
+ Map<Integer, ProfiledOperatorNodePushable> parents;
+
+ Map<Integer, ProfiledFrameWriter> outputs;
+ IOperatorStats stats;
+ IStatsCollector collector;
+
+ ICounter totalTime;
ProfiledOperatorNodePushable(IOperatorNodePushable op, ActivityId acId, IStatsCollector collector,
- IOperatorStats stats, ActivityId parent, ProfiledOperatorNodePushable parentOp)
- throws HyracksDataException {
- super(null, collector, acId.toString() + " - " + op.getDisplayName(), stats,
- parentOp != null ? parentOp.getStats() : null);
- this.parentOp = parentOp;
+ IOperatorStats stats, ProfiledOperatorNodePushable parentOp) {
+ this.stats = stats;
+ this.collector = collector;
+ this.parents = new HashMap<>();
+ parents.put(0, parentOp);
this.op = op;
this.acId = acId;
inputs = new HashMap<>();
+ outputs = new HashMap<>();
+ this.totalTime = new Counter("totalTime");
}
@Override
public void initialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.initialize();
- stopClock();
- }
+ ProfiledFrameWriter.timeMethod(op::initialize, totalTime);
}
@Override
public void deinitialize() throws HyracksDataException {
- synchronized (collector) {
- startClock();
- op.deinitialize();
- stopClock();
+ long unNestTime = totalTime.get();
+ for (ProfiledFrameWriter i : inputs.values()) {
+ unNestTime += i.getTotalTime();
}
+ for (ProfiledFrameWriter w : outputs.values()) {
+ unNestTime -= w.getTotalTime();
+ }
+ op.deinitialize();
+ stats.getTimeCounter().set(unNestTime);
}
@Override
@@ -74,17 +84,24 @@
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
throws HyracksDataException {
+ if (writer instanceof ProfiledFrameWriter) {
+ ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+ outputs.put(index, wrapper);
+ }
op.setOutputFrameWriter(index, writer, recordDesc);
}
@Override
public IFrameWriter getInputFrameWriter(int index) {
- IFrameWriter ifw = op.getInputFrameWriter(index);
- if (!(op instanceof ProfiledFrameWriter) && ifw.equals(op)) {
- return new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
+ if (inputs.get(index) == null) {
+ IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
+ ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), collector,
acId.toString() + "-" + op.getDisplayName(), stats, parentStats);
+ inputs.put(index, pfw);
+ return pfw;
+ } else {
+ return inputs.get(index);
}
- return op.getInputFrameWriter(index);
}
@Override
@@ -92,45 +109,14 @@
return op.getDisplayName();
}
- private void stopClock() {
- pause();
- collector.giveClock(this);
- }
-
- private void startClock() {
- if (frameStart > 0) {
- return;
- }
- frameStart = collector.takeClock(this);
- }
-
- @Override
- public void resume() {
- if (frameStart > 0) {
- return;
- }
- long nt = System.nanoTime();
- frameStart = nt;
- }
-
- @Override
- public void pause() {
- if (frameStart > 0) {
- long nt = System.nanoTime();
- long delta = nt - frameStart;
- timeCounter.update(delta);
- frameStart = -1;
- }
+ public void addParent(int index, ProfiledOperatorNodePushable parent) {
+ parents.put(index, parent);
}
public IOperatorStats getStats() {
return stats;
}
- public IOperatorStats getParentStats() {
- return parentStats;
- }
-
public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
ProfiledOperatorNodePushable source) throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
@@ -141,7 +127,7 @@
((IIntrospectingOperator) op).setOperatorStats(stats);
}
if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
- return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, acId, source);
+ return new ProfiledOperatorNodePushable(op, acId, ctx.getStatsCollector(), stats, source);
}
return op;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index d2ad20c..3169c81 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
@@ -54,17 +53,4 @@
*/
IOperatorStats getAggregatedStats();
- /**
- * Pause an operator's timer, to pass it to another operator
- * @param newHolder the timer that is starting execution
- * @return the current nanoTime when the clock was taken from the other operator
- */
- long takeClock(IPassableTimer newHolder);
-
- /**
- * Resume an operator's timer, when a downstream operator has finished execution of
- * the method the upstream operator called
- * @param currHolder the timer that needs to be paused
- */
- void giveClock(IPassableTimer currHolder);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index efd4e07..de28318 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -158,6 +158,12 @@
}
operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
+ } else if (profile) {
+ if (destOp instanceof ProfiledOperatorNodePushable
+ && sourceOp instanceof ProfiledOperatorNodePushable) {
+ ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
+ (ProfiledOperatorNodePushable) sourceOp);
+ }
}
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
similarity index 64%
copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
index 6afbccb..c5abb5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConsumer.java
@@ -16,17 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
-public interface IPassableTimer {
- /*
- A timer intended to be used for timing the individual components of a
- pipelined process. An instance of IPassableTimer is held by each method
- in the pipeline, and is paused() when that method passes off control to
- a component above it, and is resume()d when the component above it returns.
- */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- void pause();
-
- void resume();
+@FunctionalInterface
+public interface HyracksConsumer<T> {
+ void accept(final T elem) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
index 6afbccb..a82c150 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksRunnable.java
@@ -16,17 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.api.dataflow;
+package org.apache.hyracks.api.util;
-public interface IPassableTimer {
- /*
- A timer intended to be used for timing the individual components of a
- pipelined process. An instance of IPassableTimer is held by each method
- in the pipeline, and is paused() when that method passes off control to
- a component above it, and is resume()d when the component above it returns.
- */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- void pause();
-
- void resume();
+@FunctionalInterface
+public interface HyracksRunnable {
+ void run() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 41beac0..76c8017 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,23 +21,19 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayDeque;
import java.util.Collections;
-import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
public class StatsCollector implements IStatsCollector {
- private static final long serialVersionUID = 6858817639895434572L;
+ private static final long serialVersionUID = 6858817639895434573L;
private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
- private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
@Override
public void add(IOperatorStats operatorStats) {
@@ -91,23 +87,4 @@
}
}
- @Override
- public long takeClock(IPassableTimer newHolder) {
- if (newHolder != null) {
- if (clockHolder.peek() != null) {
- clockHolder.peek().pause();
- }
- clockHolder.push(newHolder);
- }
- return System.nanoTime();
- }
-
- @Override
- public void giveClock(IPassableTimer currHolder) {
- clockHolder.removeLastOccurrence(currHolder);
- if (clockHolder.peek() != null) {
- clockHolder.peek().resume();
- }
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 4036f00..546360a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -128,7 +128,7 @@
opTimes.forEach((key, value) -> {
ObjectNode jpe = om.createObjectNode();
jpe.put("name", key);
- jpe.put("time", Double
+ jpe.put("run-time", Double
.parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
if (value.getId().getId() >= 0) {
jpe.put("runtime-id", value.getId().toString());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 815536b..6e58e9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,14 +30,12 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.ProfiledRunGenerator;
/**
* This Operator pushes group-by aggregation into the external sort.
@@ -143,13 +141,11 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
partialAggRecordDesc, ALG);
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)", this.getActivityId())
- : runGen;
+ return runGen;
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 67b0686..dc2b46f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -224,6 +224,11 @@
}
@Override
+ public String getDisplayName() {
+ return "Intersect";
+ }
+
+ @Override
public IFrameWriter getInputFrameWriter(final int index) {
return new IFrameWriter() {
private final int[] normalizedKey1 =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ebe871b..38320ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,7 +30,6 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
@@ -79,11 +78,9 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "ExternalSort(Sort)", this.getActivityId())
- : runGen;
+ return runGen;
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
deleted file mode 100644
index 5cc1882..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ProfiledRunGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.sort;
-
-import java.util.List;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.api.job.profiling.OperatorStats;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
-
-public class ProfiledRunGenerator extends ProfiledFrameWriter implements IRunGenerator {
-
- private final IRunGenerator runGenerator;
-
- private ProfiledRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name,
- IOperatorStats stats, ActivityId root) {
- super(runGenerator, collector, name, stats, null);
- this.runGenerator = runGenerator;
- }
-
- @Override
- public List<GeneratedRunFileReader> getRuns() {
- return runGenerator.getRuns();
- }
-
- @Override
- public ISorter getSorter() {
- return runGenerator.getSorter();
- }
-
- public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name, ActivityId root)
- throws HyracksDataException {
- if (!(runGenerator instanceof ProfiledRunGenerator)) {
- String statName = root.toString() + " - " + name;
- IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(statName);
- statsCollector.add(stats);
- return new ProfiledRunGenerator(runGenerator, ctx.getStatsCollector(), name, stats, root);
- }
- return runGenerator;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index 2322910..934ae60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -29,9 +29,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -63,16 +61,9 @@
@Override
protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) {
- final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
- try {
- return profile ? ProfiledRunGenerator.time(runGen, ctx, "TopKSort (Sort)", this.getActivityId())
- : runGen;
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
- return null;
+ return runGen;
}
};
}