[ASTERIXDB-3477,3474][RT] Profiling fixes and improvements
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
The operator id was being improperly used as the key when updating
the min and max time. It needs to include the activity id to clearly
differentiate between the different activities of an operator.
Exchange time was previously being reported as the difference between
the open and close time of the connector, which is not right. Calculating
the actual time a connector is taking will require careful consideration
of each connector type and usage, such as 1-N vs N-M and pipelined vs.
materializing. Until then it is better to simply omit it, because it is
not typically a predominant factor in query time If it is, the sampled
partition profile still exists and will be useful to find those cases.
Previously, operators that were the last before a non 1-1 exchange would
not have time or cardinalities reported. This is now fixed.
All operators and connectors now report total cardinalities aggregated
across all partitions.
Ext-ref:MB-63566
Change-Id: I172f0044112777aec7200a0c6ae906711fcdc5f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18905
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 03900f3..10578d0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
},
{
"name": "R{.+}",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
index e59f095..1936be7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
@@ -185,6 +185,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -223,7 +224,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
},
{
"name": "R{.+}",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
index 111ded8..4652e73 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/plansleep/sleep.3.regexjson
@@ -31,6 +31,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -41,6 +42,9 @@
"runtime-id": "R{.+}",
"min-time": "R{[0-9.]+}",
"max-time": "R{[0-9.]+}",
+ "min-cardinality": "R{[0-9.]+}",
+ "max-cardinality": "R{[0-9.]+}",
+ "total-cardinality": "R{[0-9.]+}",
"physical-operator": "SORT_MERGE_EXCHANGE [$$49(ASC) ]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -55,8 +59,23 @@
],
"operatorId": "1.5",
"runtime-id": "R{.+}",
- "min-time": "R{[0-9.]+}",
- "max-time": "R{[0-9.]+}",
+ "activity-stats": [
+ {
+ "name": "Sort (Run Generation)",
+ "id": "ANID:0",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run Merge)",
+ "id": "ANID:1",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3
+ }
+ ],
"physical-operator": "STABLE_SORT [$$49(ASC)]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -81,6 +100,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -99,6 +119,7 @@
"max-time": "R{5.+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -115,6 +136,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -133,6 +155,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 3,
"max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -177,10 +200,23 @@
],
"operatorId": "1.12",
"runtime-id": "R{.+}",
- "min-time": "R{[0-9.]+}",
- "max-time": "R{[0-9.]+}",
- "min-cardinality": 3,
- "max-cardinality": 3,
+ "activity-stats": [
+ {
+ "name": "Sort (Run Generation)",
+ "id": "ANID:0",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run Merge)",
+ "id": "ANID:1",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3
+ }
+ ],
"physical-operator": "SORT_GROUP_BY[$$56]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -191,6 +227,9 @@
"runtime-id": "R{.+}",
"min-time": "R{[0-9.]+}",
"max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3,
"physical-operator": "HASH_PARTITION_EXCHANGE [$$56]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -227,8 +266,23 @@
],
"operatorId": "1.14",
"runtime-id": "R{.+}",
- "min-time": "R{[0-9.]+}",
- "max-time": "R{[0-9.]+}",
+ "activity-stats": [
+ {
+ "name": "Sort (Run Generation)",
+ "id": "ANID:0",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Sort (Run Merge)",
+ "id": "ANID:1",
+ "min-time": "R{[0-9.]+}",
+ "max-time": "R{[0-9.]+}",
+ "min-cardinality": 3,
+ "max-cardinality": 3,
+ "total-cardinality": 3
+ }
+ ],
"physical-operator": "SORT_GROUP_BY[$$50]",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -252,6 +306,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+ "total-cardinality": 5,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -270,6 +325,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+ "total-cardinality": 5,
"physical-operator": "ASSIGN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -283,6 +339,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 5,
"max-cardinality": 5,
+ "total-cardinality": 5,
"physical-operator": "STREAM_SELECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -298,6 +355,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 10,
"max-cardinality": 10,
+ "total-cardinality": 10,
"physical-operator": "STREAM_PROJECT",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
@@ -323,6 +381,7 @@
"max-time": "R{[0-9.]+}",
"min-cardinality": 10,
"max-cardinality": 10,
+ "total-cardinality": 10,
"physical-operator": "DATASOURCE_SCAN",
"execution-mode": "PARTITIONED",
"optimizer-estimates": "R{.+}",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
index e3e7647..2f74d34 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
index 98d7930..aef3a82 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
@@ -24,6 +24,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -35,7 +36,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
@@ -124,6 +129,7 @@
},
"open-time": "R{[0-9]+}",
"close-time": "R{[0-9]+}",
+ "cardinality": "R{[0-9]+}",
"offset": "R{[0-9]+}",
"frame-times": [
0
@@ -135,7 +141,11 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 36c33e6..5c11e69 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -104,9 +105,27 @@
private static final String CONDITION_FIELD = "condition";
private static final String MISSING_VALUE_FIELD = "missing-value";
private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
+
+ private static final String MIN_TIME = "min-time";
+
+ private static final String MAX_TIME = "max-time";
+
+ private static final String MIN_CARDINALITY = "min-cardinality";
+
+ private static final String MAX_CARDINALITY = "max-cardinality";
+
+ private static final String TOTAL_CARDINALITY = "total-cardinality";
+ private static final String NAME = "name";
+ private static final String ID = "id";
+ private static final String RUNTIME_ID = "runtime-id";
+
+ private static final String JOBLETS = "joblets";
+ private static final String CDID = "CDID";
+ private static final String ANID_START = "ANID:";
private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
private Map<Object, String> log2odid = Collections.emptyMap();
private Map<String, OperatorProfile> profile = Collections.emptyMap();
+
private final IdCounter idCounter = new IdCounter();
private final JsonGenerator jsonGenerator;
@@ -154,7 +173,7 @@
}
}
- private class ExtendedActivityId {
+ private static class ExtendedActivityId {
private final OperatorDescriptorId odId;
private final int id;
private final int microId;
@@ -162,7 +181,7 @@
private final int subId;
ExtendedActivityId(String str) {
- if (str.startsWith("ANID:")) {
+ if (str.startsWith(ANID_START)) {
str = str.substring(5);
int idIdx = str.lastIndexOf(':');
this.odId = OperatorDescriptorId.parse(str.substring(0, idIdx));
@@ -201,11 +220,15 @@
@Override
public String toString() {
- return "ANID:" + odId + ":" + getLocalId();
+ return "ANID:" + odId + ":" + id + "." + getLocalId();
+ }
+
+ private String getActivityAndLocalId() {
+ return "ANID:" + id + (!getLocalId().isEmpty() ? "." + getLocalId() : "");
}
private void catenateId(StringBuilder sb, int i) {
- if (sb.length() == 0) {
+ if (sb.isEmpty()) {
sb.append(i);
return;
}
@@ -213,9 +236,8 @@
sb.append(i);
}
- public String getLocalId() {
+ private String getLocalId() {
StringBuilder sb = new StringBuilder();
- catenateId(sb, odId.getId());
if (microId > 0) {
catenateId(sb, microId);
}
@@ -227,63 +249,83 @@
}
}
+ private static class MinMax<T extends Comparable<T>> {
+ T min;
+ T max;
+
+ public MinMax(T initial) {
+ this.min = initial;
+ this.max = initial;
+ }
+
+ public void update(T val) {
+ min = min.compareTo(val) > 0 ? val : min;
+ max = max.compareTo(val) < 0 ? val : max;
+ }
+
+ }
+
private class OperatorProfile {
- Map<String, Pair<Double, Double>> activityTimes;
- Map<String, Pair<Long, Long>> activityCards;
+
+ Map<String, String> activityNames;
+ Map<String, MinMax<Double>> activityTimes;
+ Map<String, MinMax<Long>> activityCards;
+ Map<String, Long> activityCardTotal;
OperatorProfile() {
+ activityNames = new HashMap<>();
activityTimes = new HashMap<>();
activityCards = new HashMap<>();
+ activityCardTotal = new HashMap<>();
}
- void updateOperator(String extendedOpId, double time, long cardinality) {
- updateMinMax(time, extendedOpId, activityTimes);
- if (cardinality > 0) {
- updateMinMax(cardinality, extendedOpId, activityCards);
- }
+ void updateOperator(String extendedOpId, String name, double time, long cardinality) {
+ updateOperator(extendedOpId, name, time);
+ updateMinMax(cardinality, extendedOpId, activityCards);
+ activityCardTotal.compute(extendedOpId, (id, total) -> total == null ? cardinality : total + cardinality);
}
- void updateOperator(String extendedOpId, double time) {
+ void updateOperator(String extendedOpId, String name, double time) {
+ activityNames.put(extendedOpId, name);
updateMinMax(time, extendedOpId, activityTimes);
}
- private <T extends Comparable<T>> void updateMinMax(T comp, String id, Map<String, Pair<T, T>> opMap) {
- Pair<T, T> times = opMap.computeIfAbsent(id, i -> new Pair(comp, comp));
- if (times.getFirst().compareTo(comp) > 0) {
- times.setFirst(comp);
- }
- if (times.getSecond().compareTo(comp) < 0) {
- times.setSecond(comp);
- }
+ private <T extends Comparable<T>> void updateMinMax(T comp, String id, Map<String, MinMax<T>> opMap) {
+ MinMax<T> stat = opMap.computeIfAbsent(id, i -> new MinMax<>(comp));
+ stat.update(comp);
}
}
- private ExtendedActivityId acIdFromName(String name) {
+ private Pair<ExtendedActivityId, String> splitAcId(String name) {
String[] parts = name.split(" - ");
- return new ExtendedActivityId(parts[0]);
+ return new Pair<>(new ExtendedActivityId(parts[0]), parts[1]);
}
Map<String, OperatorProfile> processProfile(ObjectNode profile) {
Map<String, OperatorProfile> profiledOps = new HashMap<>();
- for (JsonNode joblet : profile.get("joblets")) {
+ for (JsonNode joblet : profile.get(JOBLETS)) {
for (JsonNode task : joblet.get("tasks")) {
for (JsonNode counters : task.get("counters")) {
- OperatorProfile info = profiledOps.computeIfAbsent(counters.get("runtime-id").asText(),
- i -> new OperatorProfile());
+ if (counters.get(RUNTIME_ID).asText().contains(CDID)) {
+ continue;
+ }
+ OperatorProfile info =
+ profiledOps.computeIfAbsent(counters.get(RUNTIME_ID).asText(), i -> new OperatorProfile());
+ Pair<ExtendedActivityId, String> identities = splitAcId(counters.get(NAME).asText());
JsonNode card = counters.get("cardinality-out");
if (card != null) {
- info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
- counters.get("run-time").asDouble(), counters.get("cardinality-out").asLong(-1));
+ info.updateOperator(identities.first.getActivityAndLocalId(), identities.second,
+ counters.get("run-time").asDouble(), card.asLong(0));
+ } else {
+ info.updateOperator(identities.first.getActivityAndLocalId(), identities.second,
+ counters.get("run-time").asDouble());
}
- info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
- counters.get("run-time").asDouble());
}
for (JsonNode partition : task.get("partition-send-profile")) {
String id = partition.get("partition-id").get("connector-id").asText();
OperatorProfile info = profiledOps.computeIfAbsent(id, i -> new OperatorProfile());
- //CDIDs are unique
- info.updateOperator("0",
- partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+ //TODO: connector times need to be calculated accurately, until then they won't be included
+ info.updateOperator("0", "", 0, partition.get("cardinality").asLong(0));
}
}
}
@@ -350,6 +392,45 @@
}
}
+ private void printActivityStats(Optional<MinMax<Double>> time, Optional<MinMax<Long>> card,
+ Optional<Long> totalCard) throws IOException {
+ if (time.isPresent()) {
+ jsonGenerator.writeNumberField(MIN_TIME, time.get().min);
+ jsonGenerator.writeNumberField(MAX_TIME, time.get().max);
+ }
+ if (card.isPresent()) {
+ jsonGenerator.writeNumberField(MIN_CARDINALITY, card.get().min);
+ jsonGenerator.writeNumberField(MAX_CARDINALITY, card.get().max);
+ }
+ if (totalCard.isPresent()) {
+ jsonGenerator.writeNumberField(TOTAL_CARDINALITY, totalCard.get());
+ }
+ }
+
+ private void printOperatorStats(OperatorProfile info) throws IOException {
+ if (info.activityTimes.size() == 1) {
+ Optional<MinMax<Double>> times = info.activityTimes.values().stream().findFirst();
+ Optional<MinMax<Long>> cards = info.activityCards.values().stream().findFirst();
+ Optional<Long> total = info.activityCardTotal.values().stream().findFirst();
+ printActivityStats(times, cards, total);
+ } else if (info.activityTimes.size() > 1) {
+ jsonGenerator.writeArrayFieldStart("activity-stats");
+ for (String acId : info.activityTimes.keySet()) {
+ jsonGenerator.writeStartObject();
+ String prettyName = info.activityNames.get(acId);
+ if (prettyName != null) {
+ jsonGenerator.writeStringField(NAME, prettyName);
+ }
+ jsonGenerator.writeStringField("id", acId);
+ printActivityStats(Optional.ofNullable(info.activityTimes.get(acId)),
+ Optional.ofNullable(info.activityCards.get(acId)),
+ Optional.ofNullable(info.activityCardTotal.get(acId)));
+ jsonGenerator.writeEndObject();
+ }
+ jsonGenerator.writeEndArray();
+ }
+ }
+
private void printOperatorImpl(AbstractLogicalOperator op, boolean printInputs, boolean printOptimizerEstimates)
throws AlgebricksException {
try {
@@ -358,34 +439,10 @@
jsonGenerator.writeStringField("operatorId", idCounter.printOperatorId(op));
String od = log2odid.get(op);
if (od != null) {
- jsonGenerator.writeStringField("runtime-id", od);
+ jsonGenerator.writeStringField(RUNTIME_ID, od);
OperatorProfile info = profile.get(od);
if (info != null) {
- if (info.activityTimes.size() == 1) {
- Pair<Double, Double> minMax = info.activityTimes.values().iterator().next();
- jsonGenerator.writeNumberField("min-time", minMax.first);
- jsonGenerator.writeNumberField("max-time", minMax.second);
- if (info.activityCards.size() > 0) {
- Pair<Long, Long> minMaxCard = info.activityCards.values().iterator().next();
- jsonGenerator.writeNumberField("min-cardinality", minMaxCard.first);
- jsonGenerator.writeNumberField("max-cardinality", minMaxCard.second);
- }
- } else {
- jsonGenerator.writeObjectFieldStart("times");
- for (String acId : info.activityTimes.keySet()) {
- jsonGenerator.writeObjectFieldStart(acId);
- jsonGenerator.writeNumberField("min-time", info.activityTimes.get(acId).first);
- jsonGenerator.writeNumberField("max-time", info.activityTimes.get(acId).second);
- Pair<Long, Long> cards = info.activityCards.get(acId);
- if (cards != null) {
- jsonGenerator.writeNumberField("min-cardinality", info.activityCards.get(acId).first);
- jsonGenerator.writeNumberField("max-cardinality", info.activityCards.get(acId).second);
- }
- jsonGenerator.writeEndObject();
- }
- jsonGenerator.writeEndObject();
- }
-
+ printOperatorStats(info);
}
}
IPhysicalOperator pOp = op.getPhysicalOperator();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
index 354f172..bf4533f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -70,7 +70,7 @@
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (writer instanceof ITimedWriter) {
ITimedWriter wrapper = (ITimedWriter) writer;
- wrapper.setUpstreamStats(stats);
+ wrapper.setInputStats(stats);
outputs.put(index, wrapper);
}
wrapped.setOutputFrameWriter(index, writer, recordDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 2a3fa7e..f6d9cd1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -265,7 +265,7 @@
}
@Override
- public void setUpstreamStats(IOperatorStats stats) {
+ public void setInputStats(IOperatorStats stats) {
parentStats = stats;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
index 7b0f8c8..e4f990f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
@@ -22,7 +22,7 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
public interface ITimedWriter extends IFrameWriter {
- void setUpstreamStats(IOperatorStats stats);
+ void setInputStats(IOperatorStats stats);
long getTotalTime();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index cfb0e7b..fb4ed3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -41,11 +41,12 @@
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE;
+ protected IOperatorStats inputStats = NoOpOperatorStats.INSTANCE;
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
- private ICounter totalTime;
+
+ private final ICounter totalTime;
public ProfiledFrameWriter(IFrameWriter writer) {
this.writer = writer;
@@ -53,8 +54,8 @@
}
@Override
- public void setUpstreamStats(IOperatorStats stats) {
- this.upstreamStats = stats;
+ public void setInputStats(IOperatorStats stats) {
+ this.inputStats = stats;
}
public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
@@ -78,14 +79,14 @@
}
@Override
- public final void open() throws HyracksDataException {
+ public void open() throws HyracksDataException {
timeMethod(writer::open, totalTime);
}
private void updateTupleStats(ByteBuffer buffer) {
int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
- ICounter tupleCounter = upstreamStats.getTupleCounter();
+ ICounter tupleCounter = inputStats.getTupleCounter();
long prevCount = tupleCounter.get();
for (int i = 0; i < tupleCount; i++) {
int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
@@ -99,25 +100,25 @@
avgSz = (prev + tupleLen) / (prevCount + 1);
prevCount++;
}
- upstreamStats.getMaxTupleSz().set(maxSz);
- upstreamStats.getMinTupleSz().set(minSz);
- upstreamStats.getAverageTupleSz().set(avgSz);
+ inputStats.getMaxTupleSz().set(maxSz);
+ inputStats.getMinTupleSz().set(minSz);
+ inputStats.getAverageTupleSz().set(avgSz);
tupleCounter.update(tupleCount);
}
@Override
- public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
updateTupleStats(buffer);
timeMethod(writer::nextFrame, buffer);
}
@Override
- public final void flush() throws HyracksDataException {
+ public void flush() throws HyracksDataException {
timeMethod(writer::flush, totalTime);
}
@Override
- public final void fail() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
timeMethod(writer::fail, totalTime);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index 1984d8e..cb188c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -77,9 +77,9 @@
if (writer instanceof ITimedWriter) {
ITimedWriter wrapper = (ITimedWriter) writer;
if (op instanceof ISelfProfilingNodePushable) {
- wrapper.setUpstreamStats(((ISelfProfilingNodePushable) op).getStats());
+ wrapper.setInputStats(((ISelfProfilingNodePushable) op).getStats());
} else {
- wrapper.setUpstreamStats(stats);
+ wrapper.setInputStats(stats);
}
outputs.put(index, wrapper);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
index 9552a26..9a57e30 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -35,6 +35,8 @@
private long openTime;
private long closeTime;
+ private long totalTime;
+ private long card;
private MultiResolutionEventProfiler mrep;
@@ -48,10 +50,13 @@
}
- public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
+ public PartitionProfile(PartitionId pid, long openTime, long closeTime, long totalTime, long card,
+ MultiResolutionEventProfiler mrep) {
this.pid = pid;
this.openTime = openTime;
this.closeTime = closeTime;
+ this.totalTime = totalTime;
+ this.card = card;
this.mrep = mrep;
}
@@ -67,6 +72,10 @@
return closeTime;
}
+ public long getCardinality() {
+ return card;
+ }
+
public MultiResolutionEventProfiler getSamples() {
return mrep;
}
@@ -75,6 +84,8 @@
public void writeFields(DataOutput output) throws IOException {
output.writeLong(closeTime);
output.writeLong(openTime);
+ output.writeLong(totalTime);
+ output.writeLong(card);
mrep.writeFields(output);
pid.writeFields(output);
}
@@ -83,6 +94,8 @@
public void readFields(DataInput input) throws IOException {
closeTime = input.readLong();
openTime = input.readLong();
+ totalTime = input.readLong();
+ card = input.readLong();
mrep = MultiResolutionEventProfiler.create(input);
pid = PartitionId.create(input);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 84376f6..b66be1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -103,6 +103,7 @@
ppObj.set("partition-id", pidObj);
ppObj.put("open-time", pp.getOpenTime());
ppObj.put("close-time", pp.getCloseTime());
+ ppObj.put("cardinality", pp.getCardinality());
MultiResolutionEventProfiler samples = pp.getSamples();
ppObj.put("offset", samples.getOffset());
int resolution = samples.getResolution();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
index e51d2bd..65f3f31 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -24,7 +24,10 @@
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
@@ -53,45 +56,42 @@
public IFrameWriter createFrameWriter(final int receiverIndex) throws HyracksDataException {
final IFrameWriter writer = new ConnectorSenderProfilingFrameWriter(ctx,
delegate.createFrameWriter(receiverIndex), cd.getConnectorId(), senderIndex, receiverIndex);
- return new IFrameWriter() {
+ return new ProfiledFrameWriter(writer) {
private long openTime;
-
private long closeTime;
MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
+ IOperatorStats stats = new OperatorStats(cd.getDisplayName(), cd.getConnectorId().toString());
+
@Override
public void open() throws HyracksDataException {
+ super.setInputStats(stats);
openTime = System.currentTimeMillis();
- writer.open();
+ super.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
mrep.reportEvent();
- writer.nextFrame(buffer);
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
+ super.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
closeTime = System.currentTimeMillis();
- try {
- ((Task) ctx).setPartitionSendProfile(
- new PartitionProfile(new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(),
- senderIndex, receiverIndex), openTime, closeTime, mrep));
- } finally {
- writer.close();
+ long ownTime = getTotalTime();
+ if (stats != null) {
+ stats.getTimeCounter().set(ownTime);
}
- }
-
- @Override
- public void flush() throws HyracksDataException {
- writer.flush();
+ try {
+ ((Task) ctx).setPartitionSendProfile(new PartitionProfile(
+ new PartitionId(ctx.getJobletContext().getJobId(), cd.getConnectorId(), senderIndex,
+ receiverIndex),
+ openTime, closeTime, super.getTotalTime(), stats.getTupleCounter().get(), mrep));
+ } finally {
+ super.close();
+ }
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 6225d4c..410cb009 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -182,7 +183,12 @@
LOGGER.trace("input: {}: {}", i, conn.getConnectorId());
IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
partitionCount, td.getOutputPartitionCounts()[i]);
- writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer;
+ if (profile) {
+ //needed to propagate cardinality to the last operator before an exchange
+ writer = new ProfiledFrameWriter(writer);
+ } else {
+ writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+ }
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}