[ASTERIX-3283][RT] Profile micro-ops and subplans
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
Introduce the idea of a self-profiling operator, and modify
MetaOperator to be one of these. Also introduce a numbering scheme
for identifying a MicroOp inside a MetaOp by its position within the
pipeline, e.g. MicroOp position 2 in MetaOp ODID:3 would be ODID:3.2
Furthermore extend profiling into subplans, based on the MicroOp position.
The subplan numbering is extended off of the MicroOp positioning on the
subplan pipeline number and operator position. So for example Operator 3
with a Subplan in position 0 with a unary subplan with subplan MicroOp at
position 3 would be ODID:3.0.0.3
Also introduce interfaces for some of the profiling methods rather than
using concrete classes, and simplify the passing of parent stats between
IProfiledOperatorNodePushable instances by using the input/output instead
of passing it as a constructor parameter
Change-Id: Ib266878bf05782506045abfadaa83b41f0f9598b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17864
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
index 7c945fc..413f002 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
@@ -30,9 +30,6 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-/**
- * Runs the cluster state runtime tests with the storage parallelism.
- */
@RunWith(Parameterized.class)
public class ProfiledExecutionTest {
protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
index a404639..d3664d4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -44,5 +44,11 @@
<output-dir compare="Text">sleep</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="profile">
+ <compilation-unit name="non-unary-subplan">
+ <parameter name="profile" value="timings" type="string"/>
+ <output-dir compare="Text">non-unary-subplan</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
new file mode 100644
index 0000000..4fa986c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+select value
+array_sum((
+ select value a.f
+ from g as p
+ union all
+ select value a.f
+ from g as w
+))
+from TData as a
+group by a.x as x group as g
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
index 1358a62..6c35376 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
@@ -24,4 +24,4 @@
FROM Customers c
WHERE c.age <65
GROUP BY c.address.city
-ORDER BY sleep(city,1666);
\ No newline at end of file
+ORDER BY sleep(city,1700);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 88107b0..03900f3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -77,6 +77,24 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
}
]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
new file mode 100644
index 0000000..e59f095
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
@@ -0,0 +1,241 @@
+{
+ "job-id": "R{[A-Z0-9.:]+}",
+ "create-time": "R{[0-9.]+}",
+ "start-time": "R{[0-9.]+}",
+ "queued-time": "R{.+}",
+ "end-time": "R{[0-9.]+}",
+ "counters": [],
+ "joblets": [
+ {
+ "node-id": "R{.+}",
+ "counters": [],
+ "tasks": [
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [],
+ "counters": [
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ }
+ ]
+ },
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [],
+ "counters": [
+ {
+ "name": "R{ANID:ODID:[0-9]:0\\.1 - MicroOp Subplan(?:.|\n)+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ }
+ ]
+ },
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [
+ {
+ "partition-id": {
+ "job-id": "R{[A-Z0-9.:]+}",
+ "connector-id": "R{[A-Z0-9.:]+}",
+ "sender-index": "R{[0-9]+}",
+ "receiver-index": "R{[0-9]+}"
+ },
+ "open-time": "R{[0-9]+}",
+ "close-time": "R{[0-9]+}",
+ "offset": "R{[0-9]+}",
+ "frame-times": [
+ 0
+ ],
+ "resolution": 1
+ }
+ ],
+ "counters": [
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ }
+ ]
+ }
+ ]
+ }]
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
index bfb8c62..e6d1c0a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
@@ -19,6 +19,15 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
},
{
@@ -34,6 +43,15 @@
},
{
"name": "R{.+}",
+ "run-time": "R{5.+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 10,
+ "avg-tuple-size": 38,
+ "min-tuple-size": 38,
+ "max-tuple-size": 38
+ },
+ {
+ "name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
@@ -43,12 +61,12 @@
},
{
"name": "R{.+}",
- "run-time": "R{5.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
- "cardinality-out": 10,
- "avg-tuple-size": 25,
- "min-tuple-size": 25,
- "max-tuple-size": 25
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
index 012133e..e3e7647 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
@@ -80,6 +80,15 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
@@ -93,10 +102,24 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
- "cardinality-out": "R{[0-9.]+}",
- "avg-tuple-size": "R{[0-9.]+}",
- "min-tuple-size": "R{[0-9.]+}",
- "max-tuple-size": "R{[0-9.]+}"
+ "cardinality-out": 10,
+ "avg-tuple-size": 140,
+ "min-tuple-size": 137,
+ "max-tuple-size": 151
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 5,
+ "avg-tuple-size": 145,
+ "min-tuple-size": 142,
+ "max-tuple-size": 151
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
},
{
"name": "R{.+}",
@@ -111,17 +134,30 @@
},
{
"name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 5,
+ "avg-tuple-size": 16,
+ "min-tuple-size": 14,
+ "max-tuple-size": 19
+ },
+ {
+ "name": "R{.+}",
"run-time": "R{5.+}",
"runtime-id": "R{.+}",
- "cardinality-out": "R{[0-9.]+}",
- "avg-tuple-size": "R{[0-9.]+}",
- "min-tuple-size": "R{[0-9.]+}",
- "max-tuple-size": "R{[0-9.]+}"
+ "cardinality-out": 5,
+ "avg-tuple-size": 161,
+ "min-tuple-size": 156,
+ "max-tuple-size": 170
},
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": 1,
+ "avg-tuple-size": 0,
+ "min-tuple-size": 0,
+ "max-tuple-size": 0
}
]
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
index b024312..98d7930 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
@@ -58,7 +58,16 @@
"attempt": "R{[0-9]+}",
"partition-send-profile": [],
"counters": [
- {
+ {
+ "name": "R{.+}",
+ "run-time": "R{5.+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 3,
+ "avg-tuple-size": 75,
+ "min-tuple-size": 67,
+ "max-tuple-size": 82
+ },
+ {
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
@@ -69,7 +78,25 @@
},
{
"name": "R{.+}",
- "run-time": "R{5.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -153,6 +180,33 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
"pages-read": "R{[0-9.]+}",
"pages-read-cold": "R{[0-9.]+}",
"cardinality-out": "R{[0-9.]+}",
@@ -162,7 +216,7 @@
},
{
"name": "R{.+}",
- "run-time": "R{[0-9].+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
index 2347fa5..f22f5da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
@@ -46,6 +46,11 @@
}
@Override
+ public String toString() {
+ return "ClosedRecordConstructor";
+ }
+
+ @Override
public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
int n = args.length / 2;
IScalarEvaluator[] evalFields = new IScalarEvaluator[n];
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index ef9e75b..f9d75b5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -19,6 +19,8 @@
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -83,7 +85,7 @@
writer.open();
IStatsCollector coll = ctx.getStatsCollector();
if (coll != null) {
- coll.add(new OperatorStats(operatorName));
+ coll.add(new OperatorStats(operatorName, INVALID_ODID));
}
INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
indexStats = new HashMap<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 32c1464..5c532d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -25,6 +25,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -82,7 +83,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import com.fasterxml.jackson.core.JsonFactory;
@@ -107,7 +108,7 @@
private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
private Map<Object, String> log2odid = Collections.emptyMap();
- private Map<String, ProfileInfo> profile = Collections.emptyMap();
+ private Map<String, OperatorProfile> profile = Collections.emptyMap();
private final IdCounter idCounter = new IdCounter();
private final JsonGenerator jsonGenerator;
@@ -155,15 +156,88 @@
}
}
- private class ProfileInfo {
- Map<Integer, Pair<Double, Double>> activities;
+ private class ExtendedActivityId {
+ private final OperatorDescriptorId odId;
+ private final int id;
+ private final int microId;
+ private final int subPipe;
+ private final int subId;
- ProfileInfo() {
+ ExtendedActivityId(String str) {
+ if (str.startsWith("ANID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ this.odId = OperatorDescriptorId.parse(str.substring(0, idIdx));
+ String[] parts = str.substring(idIdx + 1).split("\\.");
+ this.id = Integer.parseInt(parts[0]);
+ if (parts.length >= 2) {
+ this.microId = Integer.parseInt(parts[1]);
+ } else {
+ this.microId = -1;
+ }
+ if (parts.length >= 4) {
+ this.subPipe = Integer.parseInt(parts[2]);
+ this.subId = Integer.parseInt(parts[3]);
+ } else {
+ this.subPipe = -1;
+ this.subId = -1;
+ }
+ } else {
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(values());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ExtendedActivityId) && Objects.equals(((ExtendedActivityId) o).values(), values());
+ }
+
+ private List<?> values() {
+ return List.of(odId, id, microId, subPipe, subId);
+ }
+
+ @Override
+ public String toString() {
+ return "ANID:" + odId + ":" + getLocalId();
+ }
+
+ private void catenateId(StringBuilder sb, int i) {
+ if (sb.length() == 0) {
+ sb.append(i);
+ return;
+ }
+ sb.append(".");
+ sb.append(i);
+ }
+
+ public String getLocalId() {
+ StringBuilder sb = new StringBuilder();
+ catenateId(sb, odId.getId());
+ if (microId > 0) {
+ catenateId(sb, microId);
+ }
+ if (subId > 0) {
+ catenateId(sb, subPipe);
+ catenateId(sb, subId);
+ }
+ return sb.toString();
+ }
+ }
+
+ private class OperatorProfile {
+ Map<String, Pair<Double, Double>> activities;
+
+ OperatorProfile() {
activities = new HashMap<>();
}
- void visit(int id, double time) {
- Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+ void updateOperator(String extendedOpId, double time) {
+ Pair<Double, Double> times = activities.computeIfAbsent(extendedOpId, i -> new Pair(time, time));
if (times.getFirst() > time) {
times.setFirst(time);
}
@@ -173,26 +247,27 @@
}
}
- private static ActivityId acIdFromName(String name) {
+ private ExtendedActivityId acIdFromName(String name) {
String[] parts = name.split(" - ");
- return ActivityId.parse(parts[0]);
+ return new ExtendedActivityId(parts[0]);
}
- Map<String, ProfileInfo> processProfile(ObjectNode profile) {
- Map<String, ProfileInfo> profiledOps = new HashMap<>();
+ Map<String, OperatorProfile> processProfile(ObjectNode profile) {
+ Map<String, OperatorProfile> profiledOps = new HashMap<>();
for (JsonNode joblet : profile.get("joblets")) {
for (JsonNode task : joblet.get("tasks")) {
for (JsonNode counters : task.get("counters")) {
- ProfileInfo info =
- profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
- info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+ OperatorProfile info = profiledOps.computeIfAbsent(counters.get("runtime-id").asText(),
+ i -> new OperatorProfile());
+ info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
counters.get("run-time").asDouble());
}
for (JsonNode partition : task.get("partition-send-profile")) {
String id = partition.get("partition-id").get("connector-id").asText();
- ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+ OperatorProfile info = profiledOps.computeIfAbsent(id, i -> new OperatorProfile());
//CDIDs are unique
- info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+ info.updateOperator("0",
+ partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
}
}
}
@@ -268,15 +343,16 @@
String od = log2odid.get(op);
if (od != null) {
jsonGenerator.writeStringField("runtime-id", od);
- ProfileInfo info = profile.get(od);
+ OperatorProfile info = profile.get(od);
if (info != null) {
if (info.activities.size() == 1) {
- jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
- jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+ Pair<Double, Double> minMax = info.activities.values().iterator().next();
+ jsonGenerator.writeNumberField("min-time", minMax.first);
+ jsonGenerator.writeNumberField("max-time", minMax.second);
} else {
jsonGenerator.writeObjectFieldStart("times");
- for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
- jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+ for (Map.Entry<String, Pair<Double, Double>> ac : info.activities.entrySet()) {
+ jsonGenerator.writeObjectFieldStart(ac.getKey());
jsonGenerator.writeNumberField("min-time", ac.getValue().first);
jsonGenerator.writeNumberField("max-time", ac.getValue().second);
jsonGenerator.writeEndObject();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index d8bf190..99ef8d0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -27,6 +27,7 @@
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -38,6 +39,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -148,11 +150,39 @@
hyracksOps.put(op, opDesc);
}
+ private String getExtendedOdidForMetaOp(ILogicalOperator op, int k) {
+ String base = metaAsterixOps.get(k).getOperatorId().toString();
+ Pair<IPushRuntimeFactory, RecordDescriptor> fact = microOps.get(op);
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpPipeline = metaAsterixOpSkeletons.get(k);
+ int pos = metaOpPipeline.indexOf(fact);
+ return base + "." + pos;
+ }
+
+ private void getExtendedOdidForSubplanOp(SubplanOperator op, Map<ILogicalOperator, String> log2phys) {
+ String baseId = getExtendedOdidForMetaOp(op, algebraicOpBelongingToMetaAsterixOp.get(op));
+ op.getNestedPlans().forEach(plan -> plan.getRoots()
+ .forEach(root -> getExtendedOdidForOperator(baseId, root.getValue(), log2phys, 0)));
+ }
+
+ private int getExtendedOdidForOperator(String baseId, ILogicalOperator op, Map<ILogicalOperator, String> log2phys,
+ int input) {
+ List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+ List<Integer> paths = new ArrayList<>(inputs.size());
+ for (int i = 0; i < inputs.size(); i++) {
+ ILogicalOperator nextOp = inputs.get(i).getValue();
+ paths.add(i, getExtendedOdidForOperator(baseId, nextOp, log2phys, i + input));
+ }
+ int lPath = paths.size() > 0 ? Collections.max(paths) : 0;
+ log2phys.put(op, baseId + "." + input + "." + lPath);
+ return lPath + 1;
+ }
+
public Map<Object, String> getLogical2PhysicalMap() {
Map<ILogicalOperator, String> mergedOperatorMap = new HashMap<>();
hyracksOps.forEach(((k, v) -> mergedOperatorMap.put(k, v.getOperatorId().toString())));
- algebraicOpBelongingToMetaAsterixOp
- .forEach((k, v) -> mergedOperatorMap.put(k, metaAsterixOps.get(v).getOperatorId().toString()));
+ algebraicOpBelongingToMetaAsterixOp.forEach((k, v) -> mergedOperatorMap.put(k, getExtendedOdidForMetaOp(k, v)));
+ microOps.keySet().stream().filter(op -> op instanceof SubplanOperator)
+ .forEach(op -> getExtendedOdidForSubplanOp((SubplanOperator) op, mergedOperatorMap));
connectors.forEach((k, v) -> mergedOperatorMap.put(k, v.getFirst().getConnectorId().toString()));
return Collections.unmodifiableMap(mergedOperatorMap);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 82b6f9c..b1bd46b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
public interface IPushRuntimeFactory extends Serializable {
+
IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException;
default void setSourceLocation(SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
new file mode 100644
index 0000000..354f172
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime {
+
+ private final IPushRuntime wrapped;
+ private final IOperatorStats stats;
+
+ private final boolean last;
+
+ private final Map<Integer, ITimedWriter> outputs;
+
+ public ProfiledPushRuntime(IPushRuntime push, IOperatorStats stats, boolean last) {
+ super(push);
+ outputs = new HashMap<>();
+ this.wrapped = push;
+ this.stats = stats;
+ this.last = last;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.close();
+ long ownTime = getTotalTime();
+ //for micro union all. accumulate the time of each input into the counter.
+ //then, on input 0, subtract the output from the accumulated time.
+ if (!last) {
+ stats.getTimeCounter().update(ownTime);
+ return;
+ }
+ ownTime += stats.getTimeCounter().get();
+ for (ITimedWriter w : outputs.values()) {
+ ownTime -= w.getTotalTime();
+ }
+ stats.getTimeCounter().set(ownTime);
+ }
+
+ public IOperatorStats getStats() {
+ return stats;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ if (writer instanceof ITimedWriter) {
+ ITimedWriter wrapper = (ITimedWriter) writer;
+ wrapper.setUpstreamStats(stats);
+ outputs.put(index, wrapper);
+ }
+ wrapped.setOutputFrameWriter(index, writer, recordDesc);
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ wrapped.setInputRecordDescriptor(index, recordDescriptor);
+ }
+
+ public static IPushRuntime time(IPushRuntime push, IOperatorStats stats, boolean last) throws HyracksDataException {
+ if (!(push instanceof ProfiledPushRuntime)) {
+ return new ProfiledPushRuntime(push, stats, last);
+ } else {
+ return push;
+ }
+ }
+
+ public static IPushRuntime time(IPushRuntime push, IOperatorStats stats) throws HyracksDataException {
+ return time(push, stats, true);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b3fef7f..2a3fa7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -19,26 +19,34 @@
package org.apache.hyracks.algebricks.runtime.operators.meta;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3L;
// array of factories for building the local runtime pipeline
private final AlgebricksPipeline pipeline;
@@ -85,6 +93,68 @@
}
}
+ private static String makeStatName(String base, String name, int pos, int input, int subPlan, int subPos) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(base);
+ sb.append(".");
+ sb.append(pos);
+ if (subPlan >= 0) {
+ sb.append(".");
+ sb.append(subPlan);
+ sb.append(".");
+ sb.append(subPos);
+ sb.append(" - Subplan ");
+ } else {
+ sb.append(" - MicroOp ");
+ }
+ sb.append(name);
+ if (input >= 0) {
+ sb.append(" input [");
+ sb.append(input);
+ sb.append("] ");
+ }
+ return sb.toString();
+ }
+
+ private static String makeId(String base, int id, int subPlan, int subPos) {
+ return base + "." + id + (subPlan >= 0 ? "." + subPlan : "") + (subPos >= 0 ? "." + subPos : "");
+ }
+
+ private static IOperatorStats makeStatForRuntimeFact(IPushRuntimeFactory factory, String base, String baseId,
+ int pos, int subPlan, int subPos) {
+ return new OperatorStats(makeStatName(base, factory.toString(), pos, -1, subPlan, subPos),
+ makeId(baseId, pos, subPlan, subPos));
+ }
+
+ public static Map<IPushRuntimeFactory, IOperatorStats> makeMicroOpStats(AlgebricksPipeline pipe,
+ IOperatorStats outerStats) {
+ Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
+ String baseName = outerStats.getName().split(" - ")[0];
+ String baseId = outerStats.getOperatorId();
+ List<SubplanRuntimeFactory> subplans = new ArrayList<>();
+ for (int i = 0; i < pipe.getRuntimeFactories().length; i++) {
+ IPushRuntimeFactory fact = pipe.getRuntimeFactories()[i];
+ //TODO: don't use instanceof
+ if (fact instanceof SubplanRuntimeFactory) {
+ SubplanRuntimeFactory subplanFact = (SubplanRuntimeFactory) fact;
+ subplans.add(subplanFact);
+ List<AlgebricksPipeline> pipelines = subplanFact.getPipelines();
+ for (AlgebricksPipeline p : pipelines) {
+ IPushRuntimeFactory[] subplanFactories = p.getRuntimeFactories();
+ for (int j = subplanFactories.length - 1; j > 0; j--) {
+ microOpStats.put(subplanFactories[j], makeStatForRuntimeFact(subplanFactories[j], baseName,
+ baseId, i, pipelines.indexOf(p), j));
+ }
+ }
+ }
+ microOpStats.put(fact, makeStatForRuntimeFact(fact, baseName, baseId, i, -1, -1));
+ }
+ for (SubplanRuntimeFactory sub : subplans) {
+ sub.setStats(microOpStats);
+ }
+ return microOpStats;
+ }
+
private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable {
private final IHyracksTaskContext ctx;
@@ -99,7 +169,7 @@
outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
PipelineAssembler pa =
new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
HyracksDataException exception = null;
try {
startOfPipeline.open();
@@ -126,16 +196,18 @@
public String getDisplayName() {
return "Empty Tuple Source";
}
+
}
private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ return new AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable() {
private IFrameWriter startOfPipeline;
private boolean opened = false;
+ private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
+ private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
- @Override
public void open() throws HyracksDataException {
if (startOfPipeline == null) {
RecordDescriptor pipelineOutputRecordDescriptor =
@@ -144,7 +216,7 @@
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats);
}
opened = true;
startOfPipeline.open();
@@ -175,9 +247,38 @@
}
@Override
+ public void deinitialize() throws HyracksDataException {
+ super.deinitialize();
+ }
+
+ @Override
public String toString() {
return AlgebricksMetaOperatorDescriptor.this.toString();
}
+
+ @Override
+ public void addStats(IOperatorStats stats) throws HyracksDataException {
+ microOpStats = makeMicroOpStats(pipeline, stats);
+ for (IOperatorStats stat : microOpStats.values()) {
+ ctx.getStatsCollector().add(stat);
+ }
+ }
+
+ @Override
+ public void setUpstreamStats(IOperatorStats stats) {
+ parentStats = stats;
+ }
+
+ @Override
+ public long getTotalTime() {
+ return startOfPipeline instanceof ITimedWriter ? ((ITimedWriter) startOfPipeline).getTotalTime() : 0;
+ }
+
+ @Override
+ public IOperatorStats getStats() {
+ IPushRuntimeFactory[] facts = pipeline.getRuntimeFactories();
+ return microOpStats.getOrDefault(facts[facts.length - 1], NoOpOperatorStats.INSTANCE);
+ }
};
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 858fcfa..0ce2ff5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -25,6 +25,8 @@
import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -32,6 +34,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
public class PipelineAssembler {
@@ -55,6 +58,11 @@
}
public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+ return assemblePipeline(writer, ctx, new HashMap<>());
+ }
+
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx,
+ Map<IPushRuntimeFactory, IOperatorStats> microOpStats) throws HyracksDataException {
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
@@ -67,7 +75,21 @@
IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
for (int j = 0; j < newRuntimes.length; j++) {
- if (enforce) {
+ //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline
+ //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow
+ boolean shouldProfile = !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory) && profile
+ && microOpStats.containsKey(runtimeFactory);
+ if (shouldProfile) {
+ ProfiledPushRuntime profiled;
+ if (j == 0) {
+ profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+ microOpStats.get(runtimeFactory));
+ } else {
+ profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+ microOpStats.get(runtimeFactory), false);
+ }
+ newRuntimes[j] = profiled;
+ } else if (enforce && !profile) {
newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]);
}
if (i == runtimeFactories.length - 1) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 349e50f..7feca3c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,7 +20,9 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
@@ -32,16 +34,19 @@
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3L;
private final List<AlgebricksPipeline> pipelines;
@@ -51,6 +56,8 @@
private final IMissingWriterFactory[] missingWriterFactories;
+ private final Map<IPushRuntimeFactory, IOperatorStats> stats;
+
public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories,
RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) {
super(projectionList);
@@ -61,6 +68,7 @@
if (projectionList != null) {
throw new NotImplementedException();
}
+ this.stats = new HashMap<>();
}
@Override
@@ -78,6 +86,14 @@
return sb.toString();
}
+ public List<AlgebricksPipeline> getPipelines() {
+ return pipelines;
+ }
+
+ public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
+ this.stats.putAll(stats);
+ }
+
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
@@ -92,8 +108,11 @@
boolean first;
+ boolean profile;
+
SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
this.ctx = ctx;
+ this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
this.first = true;
IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length];
@@ -114,6 +133,11 @@
if (i == 0) {
// primary pipeline
outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+ //this is necessary to track the output of the last operator to the outer product,
+ //i.e. the last real operator in pipeline 0 of the subplan
+ if (profile) {
+ outputWriter = new ProfiledFrameWriter(outputWriter);
+ }
outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc;
} else {
// secondary pipeline
@@ -127,7 +151,8 @@
}
PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
- startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx);
+ IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
+ startOfPipelines[i] = (NestedTupleSourceRuntime) head;
pipelineAssemblers[i] = pa;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
new file mode 100644
index 0000000..7402179
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ISelfProfilingNodePushable extends IStatsContainingNodePushable {
+ void addStats(IOperatorStats stats) throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
new file mode 100644
index 0000000..96a3ae9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface IStatsContainingNodePushable {
+ IOperatorStats getStats();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
new file mode 100644
index 0000000..7b0f8c8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ITimedWriter extends IFrameWriter {
+ void setUpstreamStats(IOperatorStats stats);
+
+ long getTotalTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index dc53bca..cfb0e7b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.dataflow;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.nio.ByteBuffer;
import org.apache.hyracks.api.com.job.profiling.counters.Counter;
@@ -28,30 +30,33 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.util.HyracksRunnable;
import org.apache.hyracks.api.util.HyracksThrowingConsumer;
import org.apache.hyracks.util.IntSerDeUtils;
-public class ProfiledFrameWriter implements IFrameWriter {
+public class ProfiledFrameWriter implements ITimedWriter {
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- private final ICounter tupleCounter;
- private final IOperatorStats parentStats;
+ protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE;
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
private ICounter totalTime;
- public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) {
+ public ProfiledFrameWriter(IFrameWriter writer) {
this.writer = writer;
- this.parentStats = parentStats;
- this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
this.totalTime = new Counter("totalTime");
}
+ @Override
+ public void setUpstreamStats(IOperatorStats stats) {
+ this.upstreamStats = stats;
+ }
+
public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
long nt = 0;
try {
@@ -80,25 +85,24 @@
private void updateTupleStats(ByteBuffer buffer) {
int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
- if (tupleCounter != null) {
- long prevCount = tupleCounter.get();
- for (int i = 0; i < tupleCount; i++) {
- int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
- if (maxSz < tupleLen) {
- maxSz = tupleLen;
- }
- if (minSz > tupleLen) {
- minSz = tupleLen;
- }
- long prev = avgSz * prevCount;
- avgSz = (prev + tupleLen) / (prevCount + 1);
- prevCount++;
+ ICounter tupleCounter = upstreamStats.getTupleCounter();
+ long prevCount = tupleCounter.get();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+ if (maxSz < tupleLen) {
+ maxSz = tupleLen;
}
- parentStats.getMaxTupleSz().set(maxSz);
- parentStats.getMinTupleSz().set(minSz);
- parentStats.getAverageTupleSz().set(avgSz);
- tupleCounter.update(tupleCount);
+ if (minSz > tupleLen) {
+ minSz = tupleLen;
+ }
+ long prev = avgSz * prevCount;
+ avgSz = (prev + tupleLen) / (prevCount + 1);
+ prevCount++;
}
+ upstreamStats.getMaxTupleSz().set(maxSz);
+ upstreamStats.getMinTupleSz().set(minSz);
+ upstreamStats.getAverageTupleSz().set(avgSz);
+ tupleCounter.update(tupleCount);
}
@Override
@@ -140,14 +144,15 @@
throws HyracksDataException {
if (!(writer instanceof ProfiledFrameWriter)) {
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name);
+ IOperatorStats stats = new OperatorStats(name, INVALID_ODID);
statsCollector.add(stats);
- return new ProfiledFrameWriter(writer, null);
-
- } else
+ return new ProfiledFrameWriter(writer);
+ } else {
return writer;
+ }
}
+ @Override
public long getTotalTime() {
return totalTime.get();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index bde5611..1984d8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -32,20 +32,16 @@
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
-public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable, IStatsContainingNodePushable {
private final IOperatorNodePushable op;
- private final Map<Integer, ProfiledFrameWriter> inputs;
- private final Map<Integer, ProfiledOperatorNodePushable> parents;
- private final Map<Integer, ProfiledFrameWriter> outputs;
+ private final Map<Integer, ITimedWriter> inputs;
+ private final Map<Integer, ITimedWriter> outputs;
private final IOperatorStats stats;
private final ICounter totalTime;
- ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats,
- ProfiledOperatorNodePushable parentOp) {
+ ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats) {
this.stats = stats;
- this.parents = new HashMap<>();
- parents.put(0, parentOp);
this.op = op;
inputs = new HashMap<>();
outputs = new HashMap<>();
@@ -60,10 +56,10 @@
@Override
public void deinitialize() throws HyracksDataException {
long ownTime = totalTime.get();
- for (ProfiledFrameWriter i : inputs.values()) {
+ for (ITimedWriter i : inputs.values()) {
ownTime += i.getTotalTime();
}
- for (ProfiledFrameWriter w : outputs.values()) {
+ for (ITimedWriter w : outputs.values()) {
ownTime -= w.getTotalTime();
}
op.deinitialize();
@@ -78,8 +74,13 @@
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
throws HyracksDataException {
- if (writer instanceof ProfiledFrameWriter) {
- ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+ if (writer instanceof ITimedWriter) {
+ ITimedWriter wrapper = (ITimedWriter) writer;
+ if (op instanceof ISelfProfilingNodePushable) {
+ wrapper.setUpstreamStats(((ISelfProfilingNodePushable) op).getStats());
+ } else {
+ wrapper.setUpstreamStats(stats);
+ }
outputs.put(index, wrapper);
}
op.setOutputFrameWriter(index, writer, recordDesc);
@@ -88,8 +89,7 @@
@Override
public IFrameWriter getInputFrameWriter(int index) {
if (inputs.get(index) == null) {
- IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
- ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats);
+ ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index));
inputs.put(index, pfw);
return pfw;
} else {
@@ -102,25 +102,26 @@
return op.getDisplayName();
}
- public void addParent(int index, ProfiledOperatorNodePushable parent) {
- parents.put(index, parent);
- }
-
public IOperatorStats getStats() {
return stats;
}
- public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
- ProfiledOperatorNodePushable source) throws HyracksDataException {
+ public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId)
+ throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
- statsCollector.add(stats);
+ IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
+ if (!(op instanceof ISelfProfilingNodePushable)) {
+ statsCollector.add(stats);
+ }
if (op instanceof IIntrospectingOperator) {
((IIntrospectingOperator) op).setOperatorStats(stats);
}
+ if (op instanceof ISelfProfilingNodePushable) {
+ ((ISelfProfilingNodePushable) op).addStats(stats);
+ }
if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
- return new ProfiledOperatorNodePushable(op, stats, source);
+ return new ProfiledOperatorNodePushable(op, stats);
}
return op;
}
@@ -129,7 +130,7 @@
throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
+ IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
if (op instanceof IIntrospectingOperator) {
((IIntrospectingOperator) op).setOperatorStats(stats);
statsCollector.add(stats);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 26b3a58..7770c4f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -18,10 +18,11 @@
*/
package org.apache.hyracks.api.job.profiling;
+import java.io.DataInput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.io.IWritable;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -86,11 +87,22 @@
ICounter getBytesWritten();
- OperatorDescriptorId getId();
+ String getOperatorId();
void updateIndexesStats(Map<String, IndexStats> indexesStats);
Map<String, IndexStats> getIndexesStats();
void updateFrom(IOperatorStats stats);
+
+ static IOperatorStats create(DataInput input) throws IOException {
+ String name = input.readUTF();
+ if (NoOpOperatorStats.NOOP_NAME.equals(name)) {
+ return NoOpOperatorStats.INSTANCE;
+ }
+ String operatorId = input.readUTF();
+ OperatorStats operatorStats = new OperatorStats(name, operatorId);
+ operatorStats.readFields(input);
+ return operatorStats;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
index 4c86a15..d427d14 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
@@ -24,14 +24,15 @@
import java.util.Collections;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
public class NoOpOperatorStats implements IOperatorStats {
- private static final long serialVersionUID = 9055940822300360135L;
+ private static final long serialVersionUID = 9055940222300360256L;
public static final NoOpOperatorStats INSTANCE = new NoOpOperatorStats();
+ public static final String INVALID_ODID = "ODID:-1";
+ public static final String NOOP_NAME = "NoOp";
private static final ICounter NOOP_COUNTER = new ICounter() {
private static final long serialVersionUID = 1L;
@@ -57,21 +58,20 @@
}
};
- private static final OperatorDescriptorId INVALID_ODID = new OperatorDescriptorId(-1);
-
@Override
public void writeFields(DataOutput output) throws IOException {
-
+ output.writeUTF(NOOP_NAME);
+ output.writeUTF(INVALID_ODID);
}
@Override
public void readFields(DataInput input) throws IOException {
-
+ // nothing
}
@Override
public String getName() {
- return "";
+ return NOOP_NAME;
}
@Override
@@ -130,7 +130,7 @@
}
@Override
- public OperatorDescriptorId getId() {
+ public String getOperatorId() {
return INVALID_ODID;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index c9f08e0..412b788 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -25,14 +25,13 @@
import java.util.Map;
import org.apache.hyracks.api.com.job.profiling.counters.Counter;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
public class OperatorStats implements IOperatorStats {
- private static final long serialVersionUID = 6401830963367567169L;
-
+ private static final long serialVersionUID = 6401830963361567126L;
public final String operatorName;
- public OperatorDescriptorId id;
+
+ public final String operatorId;
public final ICounter tupleCounter;
public final ICounter timeCounter;
public final ICounter pageReads;
@@ -48,12 +47,12 @@
//TODO: this is quickly becoming gross it should just be a map where the value is obliged to be a Counter
- public OperatorStats(String operatorName, OperatorDescriptorId id) {
+ public OperatorStats(String operatorName, String operatorId) {
if (operatorName == null || operatorName.isEmpty()) {
throw new IllegalArgumentException("operatorName must not be null or empty");
}
this.operatorName = operatorName;
- this.id = id;
+ this.operatorId = operatorId;
tupleCounter = new Counter("tupleCounter");
timeCounter = new Counter("timeCounter");
pageReads = new Counter("diskIoCounter");
@@ -69,17 +68,6 @@
indexesStats = new HashMap<>();
}
- public OperatorStats(String operatorName) {
- this(operatorName, new OperatorDescriptorId(-1));
- }
-
- public static IOperatorStats create(DataInput input) throws IOException {
- String name = input.readUTF();
- OperatorStats operatorStats = new OperatorStats(name);
- operatorStats.readFields(input);
- return operatorStats;
- }
-
@Override
public String getName() {
return operatorName;
@@ -141,8 +129,8 @@
}
@Override
- public OperatorDescriptorId getId() {
- return id;
+ public String getOperatorId() {
+ return operatorId;
}
@Override
@@ -178,7 +166,7 @@
@Override
public void writeFields(DataOutput output) throws IOException {
output.writeUTF(operatorName);
- id.writeFields(output);
+ output.writeUTF(operatorId);
output.writeLong(tupleCounter.get());
output.writeLong(timeCounter.get());
output.writeLong(pageReads.get());
@@ -195,7 +183,6 @@
@Override
public void readFields(DataInput input) throws IOException {
- id = OperatorDescriptorId.create(input);
tupleCounter.set(input.readLong());
timeCounter.set(input.readLong());
pageReads.set(input.readLong());
@@ -229,13 +216,13 @@
@Override
public String toString() {
- return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
- + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \""
- + coldReadCounter.getName() + "\": " + coldReadCounter.get() + avgTupleSz.getName() + "\": "
- + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + minTupleSz.get() + ", \""
- + minTupleSz.getName() + "\": " + timeCounter.get() + ", \"" + inputTupleCounter.getName() + "\": "
- + bytesRead.get() + ", \"" + bytesRead.getName() + "\": " + bytesWritten.get() + ", \""
- + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \"" + level.getName() + "\": "
- + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
+ return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"id\": \"" + operatorId + "\", " + "\""
+ + tupleCounter.getName() + "\": " + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": "
+ + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get()
+ + avgTupleSz.getName() + "\": " + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": "
+ + minTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + timeCounter.get() + ", \""
+ + inputTupleCounter.getName() + "\": " + bytesRead.get() + ", \"" + bytesRead.getName() + "\": "
+ + bytesWritten.get() + ", \"" + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \""
+ + level.getName() + "\": " + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4261c1e..17f5cb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -109,11 +109,11 @@
*/
Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>();
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable = null;
+ IOperatorNodePushable opPushable;
if (profile) {
IOperatorNodePushable wrapped =
entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey(), null);
+ opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey());
} else {
opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
ProfiledOperatorNodePushable.onlyAddStats(opPushable, ctx, entry.getKey());
@@ -147,12 +147,7 @@
if (profile) {
IOperatorNodePushable wrapped = channel.getRight().getLeft().createPushRuntime(ctx,
recordDescProvider, partition, nPartitions);
- if (sourceOp instanceof ProfiledOperatorNodePushable) {
- destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId,
- (ProfiledOperatorNodePushable) sourceOp);
- } else {
- destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId, null);
- }
+ destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId);
} else {
destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
nPartitions);
@@ -160,12 +155,6 @@
}
operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
- } else if (profile) {
- if (destOp instanceof ProfiledOperatorNodePushable
- && sourceOp instanceof ProfiledOperatorNodePushable) {
- ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
- (ProfiledOperatorNodePushable) sourceOp);
- }
}
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 26c664a..7cfbda0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -31,7 +33,7 @@
import org.apache.hyracks.api.job.profiling.OperatorStats;
public class StatsCollector implements IStatsCollector {
- private static final long serialVersionUID = 6858817639895434574L;
+ private static final long serialVersionUID = 6858817639895434379L;
private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
@@ -61,7 +63,7 @@
@Override
public IOperatorStats getAggregatedStats() {
- IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+ IOperatorStats aggregatedStats = new OperatorStats("aggregated", INVALID_ODID);
for (IOperatorStats stats : operatorStatsMap.values()) {
aggregatedStats.getInputTupleCounter().update(stats.getInputTupleCounter().get());
aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get());
@@ -74,8 +76,8 @@
@Override
public void writeFields(DataOutput output) throws IOException {
output.writeInt(operatorStatsMap.size());
- for (IOperatorStats operatorStats : operatorStatsMap.values()) {
- operatorStats.writeFields(output);
+ for (IOperatorStats stats : operatorStatsMap.values()) {
+ stats.writeFields(output);
}
}
@@ -83,7 +85,7 @@
public void readFields(DataInput input) throws IOException {
int operatorCount = input.readInt();
for (int i = 0; i < operatorCount; i++) {
- IOperatorStats opStats = OperatorStats.create(input);
+ IOperatorStats opStats = IOperatorStats.create(input);
operatorStatsMap.put(opStats.getName(), opStats);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 871d2bc..40bc1ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling.om;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -184,7 +186,7 @@
}
IOperatorStats opOutStats = outStats[i];
if (opOutStats == null) {
- opOutStats = new OperatorStats(operatorName);
+ opOutStats = new OperatorStats(operatorName, INVALID_ODID);
outStats[i] = opOutStats;
}
opOutStats.updateFrom(opTaskStats);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 546360a..84376f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling.om;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -130,8 +132,8 @@
jpe.put("name", key);
jpe.put("run-time", Double
.parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
- if (value.getId().getId() >= 0) {
- jpe.put("runtime-id", value.getId().toString());
+ if (!value.getOperatorId().equals(INVALID_ODID)) {
+ jpe.put("runtime-id", value.getOperatorId());
}
if (value.getPageReads().get() > 0) {
jpe.put("pages-read", value.getPageReads().get());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
new file mode 100644
index 0000000..7f0f5a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.ISelfProfilingNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+
+public abstract class AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable
+ extends AbstractUnaryInputUnaryOutputOperatorNodePushable implements ISelfProfilingNodePushable, ITimedWriter {
+
+}