Merge "Merge branch 'gerrit/trinity' into 'master'"
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 9517cf0..9bd65a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -108,6 +109,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -205,7 +207,8 @@
public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledStatement statement,
Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, IWarningCollector warningCollector,
- IRequestParameters requestParameters) throws AlgebricksException, ACIDException {
+ IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags)
+ throws AlgebricksException, ACIDException {
// establish facts
final boolean isQuery = query != null;
@@ -326,7 +329,7 @@
JobEventListenerFactory jobEventListenerFactory =
new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
- JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory);
+ JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory, runtimeFlags);
if (isQuery) {
if (!compiler.skipJobCapacityAssignment()) {
@@ -348,7 +351,10 @@
if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
cboMode);
- lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+ if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+ lastPlan =
+ new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat());
+ }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index d1a1008..7ac1431 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -100,7 +100,7 @@
metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
}
metadata.setProcessedObjects(processedObjects);
- metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / pagesRead : Double.NaN);
+ metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
metadata.setWarnings(AggregateWarnings);
metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
if (run != null && run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d52429f..a99fc22 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3870,7 +3870,7 @@
loadStmt.getDatasetName(), loadStmt.getAdapter(), properties, loadStmt.dataIsAlreadySorted());
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
- null, responsePrinter, warningCollector, null);
+ null, responsePrinter, warningCollector, null, jobFlags);
afterCompile();
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -3948,7 +3948,7 @@
copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
- null, responsePrinter, warningCollector, null);
+ null, responsePrinter, warningCollector, null, jobFlags);
afterCompile();
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -4041,7 +4041,7 @@
// Query Compilation (happens under the same ongoing metadata transaction)
final JobSpecification jobSpec = apiFramework.compileQuery(hcc, metadataProvider, copyTo.getQuery(),
rewrittenResult.second, null, sessionOutput, compiledCopyToStatement, externalVars,
- responsePrinter, warningCollector, requestParameters);
+ responsePrinter, warningCollector, requestParameters, jobFlags);
// update stats with count of compile-time warnings. needs to be adapted for multi-statement.
stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount());
afterCompile();
@@ -4238,7 +4238,7 @@
// Query Compilation (happens under the same ongoing metadata transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first,
rewrittenResult.second, stmt == null ? null : stmt.getDatasetName(), sessionOutput, stmt, externalVars,
- responsePrinter, warningCollector, requestParameters);
+ responsePrinter, warningCollector, requestParameters, jobFlags);
}
protected JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector clusterInfoCollector,
@@ -4281,7 +4281,7 @@
// transaction)
return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, rewrittenInsertUpsert.getQuery(),
rewrittenResult.second, datasetName, sessionOutput, clfrqs, externalVars, responsePrinter,
- warningCollector, null);
+ warningCollector, null, jobFlags);
}
protected void handleCreateFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
index 7c945fc..413f002 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
@@ -30,9 +30,6 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-/**
- * Runs the cluster state runtime tests with the storage parallelism.
- */
@RunWith(Parameterized.class)
public class ProfiledExecutionTest {
protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
index a404639..d3664d4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -44,5 +44,11 @@
<output-dir compare="Text">sleep</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="profile">
+ <compilation-unit name="non-unary-subplan">
+ <parameter name="profile" value="timings" type="string"/>
+ <output-dir compare="Text">non-unary-subplan</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
new file mode 100644
index 0000000..4fa986c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/non-unary-subplan/non-unary-subplan.3.profile.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+select value
+array_sum((
+ select value a.f
+ from g as p
+ union all
+ select value a.f
+ from g as w
+))
+from TData as a
+group by a.x as x group as g
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
index 82d030e..6c35376 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/sleep/sleep.5.profile.sqlpp
@@ -24,4 +24,4 @@
FROM Customers c
WHERE c.age <65
GROUP BY c.address.city
-ORDER BY sleep(city,1667);
\ No newline at end of file
+ORDER BY sleep(city,1700);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 88107b0..03900f3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -77,6 +77,24 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
}
]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
new file mode 100644
index 0000000..e59f095
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/non-unary-subplan/non-unary-subplan.3.regexjson
@@ -0,0 +1,241 @@
+{
+ "job-id": "R{[A-Z0-9.:]+}",
+ "create-time": "R{[0-9.]+}",
+ "start-time": "R{[0-9.]+}",
+ "queued-time": "R{.+}",
+ "end-time": "R{[0-9.]+}",
+ "counters": [],
+ "joblets": [
+ {
+ "node-id": "R{.+}",
+ "counters": [],
+ "tasks": [
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [],
+ "counters": [
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ }
+ ]
+ },
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [],
+ "counters": [
+ {
+ "name": "R{ANID:ODID:[0-9]:0\\.1 - MicroOp Subplan(?:.|\n)+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ }
+ ]
+ },
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [
+ {
+ "partition-id": {
+ "job-id": "R{[A-Z0-9.:]+}",
+ "connector-id": "R{[A-Z0-9.:]+}",
+ "sender-index": "R{[0-9]+}",
+ "receiver-index": "R{[0-9]+}"
+ },
+ "open-time": "R{[0-9]+}",
+ "close-time": "R{[0-9]+}",
+ "offset": "R{[0-9]+}",
+ "frame-times": [
+ 0
+ ],
+ "resolution": 1
+ }
+ ],
+ "counters": [
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ }
+ ]
+ }
+ ]
+ }]
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
index bfb8c62..e6d1c0a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.3.regexjson
@@ -19,6 +19,15 @@
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
},
{
@@ -34,6 +43,15 @@
},
{
"name": "R{.+}",
+ "run-time": "R{5.+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 10,
+ "avg-tuple-size": 38,
+ "min-tuple-size": 38,
+ "max-tuple-size": 38
+ },
+ {
+ "name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
@@ -43,12 +61,12 @@
},
{
"name": "R{.+}",
- "run-time": "R{5.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
- "cardinality-out": 10,
- "avg-tuple-size": 25,
- "min-tuple-size": 25,
- "max-tuple-size": 25
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
index 012133e..e3e7647 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.4.regexjson
@@ -80,6 +80,15 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
}
]
},
@@ -93,10 +102,24 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
- "cardinality-out": "R{[0-9.]+}",
- "avg-tuple-size": "R{[0-9.]+}",
- "min-tuple-size": "R{[0-9.]+}",
- "max-tuple-size": "R{[0-9.]+}"
+ "cardinality-out": 10,
+ "avg-tuple-size": 140,
+ "min-tuple-size": 137,
+ "max-tuple-size": 151
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 5,
+ "avg-tuple-size": 145,
+ "min-tuple-size": 142,
+ "max-tuple-size": 151
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}"
},
{
"name": "R{.+}",
@@ -111,17 +134,30 @@
},
{
"name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 5,
+ "avg-tuple-size": 16,
+ "min-tuple-size": 14,
+ "max-tuple-size": 19
+ },
+ {
+ "name": "R{.+}",
"run-time": "R{5.+}",
"runtime-id": "R{.+}",
- "cardinality-out": "R{[0-9.]+}",
- "avg-tuple-size": "R{[0-9.]+}",
- "min-tuple-size": "R{[0-9.]+}",
- "max-tuple-size": "R{[0-9.]+}"
+ "cardinality-out": 5,
+ "avg-tuple-size": 161,
+ "min-tuple-size": 156,
+ "max-tuple-size": 170
},
{
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
- "runtime-id": "R{.+}"
+ "runtime-id": "R{.+}",
+ "cardinality-out": 1,
+ "avg-tuple-size": 0,
+ "min-tuple-size": 0,
+ "max-tuple-size": 0
}
]
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
index b024312..98d7930 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/sleep/sleep.5.regexjson
@@ -58,7 +58,16 @@
"attempt": "R{[0-9]+}",
"partition-send-profile": [],
"counters": [
- {
+ {
+ "name": "R{.+}",
+ "run-time": "R{5.+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": 3,
+ "avg-tuple-size": 75,
+ "min-tuple-size": 67,
+ "max-tuple-size": 82
+ },
+ {
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
@@ -69,7 +78,25 @@
},
{
"name": "R{.+}",
- "run-time": "R{5.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
@@ -153,6 +180,33 @@
"name": "R{.+}",
"run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
+ "cardinality-out": "R{[0-9.]+}",
+ "avg-tuple-size": "R{[0-9.]+}",
+ "min-tuple-size": "R{[0-9.]+}",
+ "max-tuple-size": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "run-time": "R{[0-9.]+}",
+ "runtime-id": "R{.+}",
"pages-read": "R{[0-9.]+}",
"pages-read-cold": "R{[0-9.]+}",
"cardinality-out": "R{[0-9.]+}",
@@ -162,7 +216,7 @@
},
{
"name": "R{.+}",
- "run-time": "R{[0-9].+}",
+ "run-time": "R{[0-9.]+}",
"runtime-id": "R{.+}",
"cardinality-out": "R{[0-9.]+}",
"avg-tuple-size": "R{[0-9.]+}",
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
index 2347fa5..f22f5da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ClosedRecordConstructorEvalFactory.java
@@ -46,6 +46,11 @@
}
@Override
+ public String toString() {
+ return "ClosedRecordConstructor";
+ }
+
+ @Override
public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
int n = args.length / 2;
IScalarEvaluator[] evalFields = new IScalarEvaluator[n];
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index f712bdd..6c7f2f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -19,6 +19,8 @@
package org.apache.asterix.runtime.operators;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -85,7 +87,7 @@
writer.open();
IStatsCollector coll = ctx.getStatsCollector();
if (coll != null) {
- coll.add(new OperatorStats(operatorName));
+ coll.add(new OperatorStats(operatorName, INVALID_ODID));
}
INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
indexesStats = new HashMap<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 85910aa..a44e1be 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.compiler.api;
+import java.util.EnumSet;
import java.util.List;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -47,6 +48,7 @@
import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobSpecification;
public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuilder {
@@ -172,6 +174,17 @@
}
@Override
+ public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory,
+ EnumSet<JobFlag> runtimeFlags) throws AlgebricksException {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n");
+ PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory);
+ if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+ pc.enableLog2PhysMapping();
+ }
+ return pc.compilePlan(plan, jobEventListenerFactory);
+ }
+
+ @Override
public boolean skipJobCapacityAssignment() {
return oc.skipJobCapacityAssignment();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
index 0f5798e..7de0adb 100644
--- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompiler.java
@@ -18,8 +18,11 @@
*/
package org.apache.hyracks.algebricks.compiler.api;
+import java.util.EnumSet;
+
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobSpecification;
public interface ICompiler {
@@ -28,5 +31,8 @@
public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory)
throws AlgebricksException;
+ JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobletEventListenerFactory,
+ EnumSet<JobFlag> runtimeFlags) throws AlgebricksException;
+
boolean skipJobCapacityAssignment();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 3e4e09f..d1a356c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -25,6 +25,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -80,7 +81,7 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
-import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import com.fasterxml.jackson.core.JsonFactory;
@@ -105,7 +106,7 @@
private static final String OPTIMIZER_ESTIMATES = "optimizer-estimates";
private final Map<AbstractLogicalOperator, String> operatorIdentity = new HashMap<>();
private Map<Object, String> log2odid = Collections.emptyMap();
- private Map<String, ProfileInfo> profile = Collections.emptyMap();
+ private Map<String, OperatorProfile> profile = Collections.emptyMap();
private final IdCounter idCounter = new IdCounter();
private final JsonGenerator jsonGenerator;
@@ -153,15 +154,88 @@
}
}
- private class ProfileInfo {
- Map<Integer, Pair<Double, Double>> activities;
+ private class ExtendedActivityId {
+ private final OperatorDescriptorId odId;
+ private final int id;
+ private final int microId;
+ private final int subPipe;
+ private final int subId;
- ProfileInfo() {
+ ExtendedActivityId(String str) {
+ if (str.startsWith("ANID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ this.odId = OperatorDescriptorId.parse(str.substring(0, idIdx));
+ String[] parts = str.substring(idIdx + 1).split("\\.");
+ this.id = Integer.parseInt(parts[0]);
+ if (parts.length >= 2) {
+ this.microId = Integer.parseInt(parts[1]);
+ } else {
+ this.microId = -1;
+ }
+ if (parts.length >= 4) {
+ this.subPipe = Integer.parseInt(parts[2]);
+ this.subId = Integer.parseInt(parts[3]);
+ } else {
+ this.subPipe = -1;
+ this.subId = -1;
+ }
+ } else {
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(values());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ExtendedActivityId) && Objects.equals(((ExtendedActivityId) o).values(), values());
+ }
+
+ private List<?> values() {
+ return List.of(odId, id, microId, subPipe, subId);
+ }
+
+ @Override
+ public String toString() {
+ return "ANID:" + odId + ":" + getLocalId();
+ }
+
+ private void catenateId(StringBuilder sb, int i) {
+ if (sb.length() == 0) {
+ sb.append(i);
+ return;
+ }
+ sb.append(".");
+ sb.append(i);
+ }
+
+ public String getLocalId() {
+ StringBuilder sb = new StringBuilder();
+ catenateId(sb, odId.getId());
+ if (microId > 0) {
+ catenateId(sb, microId);
+ }
+ if (subId > 0) {
+ catenateId(sb, subPipe);
+ catenateId(sb, subId);
+ }
+ return sb.toString();
+ }
+ }
+
+ private class OperatorProfile {
+ Map<String, Pair<Double, Double>> activities;
+
+ OperatorProfile() {
activities = new HashMap<>();
}
- void visit(int id, double time) {
- Pair<Double, Double> times = activities.computeIfAbsent(id, i -> new Pair(time, time));
+ void updateOperator(String extendedOpId, double time) {
+ Pair<Double, Double> times = activities.computeIfAbsent(extendedOpId, i -> new Pair(time, time));
if (times.getFirst() > time) {
times.setFirst(time);
}
@@ -171,26 +245,27 @@
}
}
- private static ActivityId acIdFromName(String name) {
+ private ExtendedActivityId acIdFromName(String name) {
String[] parts = name.split(" - ");
- return ActivityId.parse(parts[0]);
+ return new ExtendedActivityId(parts[0]);
}
- Map<String, ProfileInfo> processProfile(ObjectNode profile) {
- Map<String, ProfileInfo> profiledOps = new HashMap<>();
+ Map<String, OperatorProfile> processProfile(ObjectNode profile) {
+ Map<String, OperatorProfile> profiledOps = new HashMap<>();
for (JsonNode joblet : profile.get("joblets")) {
for (JsonNode task : joblet.get("tasks")) {
for (JsonNode counters : task.get("counters")) {
- ProfileInfo info =
- profiledOps.computeIfAbsent(counters.get("runtime-id").asText(), i -> new ProfileInfo());
- info.visit(acIdFromName(counters.get("name").asText()).getLocalId(),
+ OperatorProfile info = profiledOps.computeIfAbsent(counters.get("runtime-id").asText(),
+ i -> new OperatorProfile());
+ info.updateOperator(acIdFromName(counters.get("name").asText()).getLocalId(),
counters.get("run-time").asDouble());
}
for (JsonNode partition : task.get("partition-send-profile")) {
String id = partition.get("partition-id").get("connector-id").asText();
- ProfileInfo info = profiledOps.computeIfAbsent(id, i -> new ProfileInfo());
+ OperatorProfile info = profiledOps.computeIfAbsent(id, i -> new OperatorProfile());
//CDIDs are unique
- info.visit(0, partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
+ info.updateOperator("0",
+ partition.get("close-time").asDouble() - partition.get("open-time").asDouble());
}
}
}
@@ -266,15 +341,16 @@
String od = log2odid.get(op);
if (od != null) {
jsonGenerator.writeStringField("runtime-id", od);
- ProfileInfo info = profile.get(od);
+ OperatorProfile info = profile.get(od);
if (info != null) {
if (info.activities.size() == 1) {
- jsonGenerator.writeNumberField("min-time", info.activities.get(0).first);
- jsonGenerator.writeNumberField("max-time", info.activities.get(0).second);
+ Pair<Double, Double> minMax = info.activities.values().iterator().next();
+ jsonGenerator.writeNumberField("min-time", minMax.first);
+ jsonGenerator.writeNumberField("max-time", minMax.second);
} else {
jsonGenerator.writeObjectFieldStart("times");
- for (Map.Entry<Integer, Pair<Double, Double>> ac : info.activities.entrySet()) {
- jsonGenerator.writeObjectFieldStart(ac.getKey().toString());
+ for (Map.Entry<String, Pair<Double, Double>> ac : info.activities.entrySet()) {
+ jsonGenerator.writeObjectFieldStart(ac.getKey());
jsonGenerator.writeNumberField("min-time", ac.getValue().first);
jsonGenerator.writeNumberField("max-time", ac.getValue().second);
jsonGenerator.writeEndObject();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index d8bf190..ca26515 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -27,6 +27,7 @@
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -38,6 +39,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
@@ -72,6 +74,8 @@
private int aodCounter = 0;
+ private boolean genLog2PhysMap = false;
+
public JobBuilder(JobSpecification jobSpec, AlgebricksAbsolutePartitionConstraint clusterLocations) {
this.jobSpec = jobSpec;
this.clusterLocations = clusterLocations;
@@ -94,6 +98,10 @@
new String[] { clusterLocations.getLocations()[Math.abs(jobSpec.hashCode() % nPartitions)] });
}
+ public void enableLog2PhysMapping() {
+ this.genLog2PhysMap = true;
+ }
+
@Override
public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) {
contributeMicroOperator(op, runtime, recDesc, null);
@@ -148,11 +156,39 @@
hyracksOps.put(op, opDesc);
}
+ private String getExtendedOdidForMetaOp(ILogicalOperator op, int k) {
+ String base = metaAsterixOps.get(k).getOperatorId().toString();
+ Pair<IPushRuntimeFactory, RecordDescriptor> fact = microOps.get(op);
+ List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpPipeline = metaAsterixOpSkeletons.get(k);
+ int pos = metaOpPipeline.indexOf(fact);
+ return base + "." + pos;
+ }
+
+ private void getExtendedOdidForSubplanOp(SubplanOperator op, Map<ILogicalOperator, String> log2phys) {
+ String baseId = getExtendedOdidForMetaOp(op, algebraicOpBelongingToMetaAsterixOp.get(op));
+ op.getNestedPlans().forEach(plan -> plan.getRoots()
+ .forEach(root -> getExtendedOdidForOperator(baseId, root.getValue(), log2phys, 0)));
+ }
+
+ private int getExtendedOdidForOperator(String baseId, ILogicalOperator op, Map<ILogicalOperator, String> log2phys,
+ int input) {
+ List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+ List<Integer> paths = new ArrayList<>(inputs.size());
+ for (int i = 0; i < inputs.size(); i++) {
+ ILogicalOperator nextOp = inputs.get(i).getValue();
+ paths.add(i, getExtendedOdidForOperator(baseId, nextOp, log2phys, i + input));
+ }
+ int lPath = paths.size() > 0 ? Collections.max(paths) : 0;
+ log2phys.put(op, baseId + "." + input + "." + lPath);
+ return lPath + 1;
+ }
+
public Map<Object, String> getLogical2PhysicalMap() {
Map<ILogicalOperator, String> mergedOperatorMap = new HashMap<>();
hyracksOps.forEach(((k, v) -> mergedOperatorMap.put(k, v.getOperatorId().toString())));
- algebraicOpBelongingToMetaAsterixOp
- .forEach((k, v) -> mergedOperatorMap.put(k, metaAsterixOps.get(v).getOperatorId().toString()));
+ algebraicOpBelongingToMetaAsterixOp.forEach((k, v) -> mergedOperatorMap.put(k, getExtendedOdidForMetaOp(k, v)));
+ microOps.keySet().stream().filter(op -> op instanceof SubplanOperator)
+ .forEach(op -> getExtendedOdidForSubplanOp((SubplanOperator) op, mergedOperatorMap));
connectors.forEach((k, v) -> mergedOperatorMap.put(k, v.getFirst().getConnectorId().toString()));
return Collections.unmodifiableMap(mergedOperatorMap);
}
@@ -184,7 +220,11 @@
jobSpec.addRoot(opDesc);
}
setAllPartitionConstraints(tgtConstraints);
- jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+ if (genLog2PhysMap) {
+ jobSpec.setLogical2PhysicalMap(getLogical2PhysicalMap());
+ } else {
+ jobSpec.setLogical2PhysicalMap(Collections.emptyMap());
+ }
}
public List<IOperatorDescriptor> getGeneratedMetaOps() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 2ef5c6c..f8bdfa5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -39,6 +39,8 @@
private JobGenContext context;
private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<>();
+ boolean genLog2PhysMap = false;
+
public PlanCompiler(JobGenContext context) {
this.context = context;
}
@@ -47,6 +49,10 @@
return context;
}
+ public void enableLog2PhysMapping() {
+ this.genLog2PhysMap = true;
+ }
+
public JobSpecification compilePlan(ILogicalPlan plan, IJobletEventListenerFactory jobEventListenerFactory)
throws AlgebricksException {
return compilePlanImpl(plan, false, null, jobEventListenerFactory);
@@ -66,6 +72,9 @@
}
List<ILogicalOperator> rootOps = new ArrayList<>();
JobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
+ if (genLog2PhysMap) {
+ builder.enableLog2PhysMapping();
+ }
for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
compileOpRef(opRef, spec, builder, outerPlanSchema);
rootOps.add(opRef.getValue());
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index 82b6f9c..b1bd46b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
public interface IPushRuntimeFactory extends Serializable {
+
IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException;
default void setSourceLocation(SourceLocation sourceLoc) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
new file mode 100644
index 0000000..354f172
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime {
+
+ private final IPushRuntime wrapped;
+ private final IOperatorStats stats;
+
+ private final boolean last;
+
+ private final Map<Integer, ITimedWriter> outputs;
+
+ public ProfiledPushRuntime(IPushRuntime push, IOperatorStats stats, boolean last) {
+ super(push);
+ outputs = new HashMap<>();
+ this.wrapped = push;
+ this.stats = stats;
+ this.last = last;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ super.close();
+ long ownTime = getTotalTime();
+ //for micro union all. accumulate the time of each input into the counter.
+ //then, on input 0, subtract the output from the accumulated time.
+ if (!last) {
+ stats.getTimeCounter().update(ownTime);
+ return;
+ }
+ ownTime += stats.getTimeCounter().get();
+ for (ITimedWriter w : outputs.values()) {
+ ownTime -= w.getTotalTime();
+ }
+ stats.getTimeCounter().set(ownTime);
+ }
+
+ public IOperatorStats getStats() {
+ return stats;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ if (writer instanceof ITimedWriter) {
+ ITimedWriter wrapper = (ITimedWriter) writer;
+ wrapper.setUpstreamStats(stats);
+ outputs.put(index, wrapper);
+ }
+ wrapped.setOutputFrameWriter(index, writer, recordDesc);
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+ wrapped.setInputRecordDescriptor(index, recordDescriptor);
+ }
+
+ public static IPushRuntime time(IPushRuntime push, IOperatorStats stats, boolean last) throws HyracksDataException {
+ if (!(push instanceof ProfiledPushRuntime)) {
+ return new ProfiledPushRuntime(push, stats, last);
+ } else {
+ return push;
+ }
+ }
+
+ public static IPushRuntime time(IPushRuntime push, IOperatorStats stats) throws HyracksDataException {
+ return time(push, stats, true);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index b3fef7f..2a3fa7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -19,26 +19,34 @@
package org.apache.hyracks.algebricks.runtime.operators.meta;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3L;
// array of factories for building the local runtime pipeline
private final AlgebricksPipeline pipeline;
@@ -85,6 +93,68 @@
}
}
+ private static String makeStatName(String base, String name, int pos, int input, int subPlan, int subPos) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(base);
+ sb.append(".");
+ sb.append(pos);
+ if (subPlan >= 0) {
+ sb.append(".");
+ sb.append(subPlan);
+ sb.append(".");
+ sb.append(subPos);
+ sb.append(" - Subplan ");
+ } else {
+ sb.append(" - MicroOp ");
+ }
+ sb.append(name);
+ if (input >= 0) {
+ sb.append(" input [");
+ sb.append(input);
+ sb.append("] ");
+ }
+ return sb.toString();
+ }
+
+ private static String makeId(String base, int id, int subPlan, int subPos) {
+ return base + "." + id + (subPlan >= 0 ? "." + subPlan : "") + (subPos >= 0 ? "." + subPos : "");
+ }
+
+ private static IOperatorStats makeStatForRuntimeFact(IPushRuntimeFactory factory, String base, String baseId,
+ int pos, int subPlan, int subPos) {
+ return new OperatorStats(makeStatName(base, factory.toString(), pos, -1, subPlan, subPos),
+ makeId(baseId, pos, subPlan, subPos));
+ }
+
+ public static Map<IPushRuntimeFactory, IOperatorStats> makeMicroOpStats(AlgebricksPipeline pipe,
+ IOperatorStats outerStats) {
+ Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
+ String baseName = outerStats.getName().split(" - ")[0];
+ String baseId = outerStats.getOperatorId();
+ List<SubplanRuntimeFactory> subplans = new ArrayList<>();
+ for (int i = 0; i < pipe.getRuntimeFactories().length; i++) {
+ IPushRuntimeFactory fact = pipe.getRuntimeFactories()[i];
+ //TODO: don't use instanceof
+ if (fact instanceof SubplanRuntimeFactory) {
+ SubplanRuntimeFactory subplanFact = (SubplanRuntimeFactory) fact;
+ subplans.add(subplanFact);
+ List<AlgebricksPipeline> pipelines = subplanFact.getPipelines();
+ for (AlgebricksPipeline p : pipelines) {
+ IPushRuntimeFactory[] subplanFactories = p.getRuntimeFactories();
+ for (int j = subplanFactories.length - 1; j > 0; j--) {
+ microOpStats.put(subplanFactories[j], makeStatForRuntimeFact(subplanFactories[j], baseName,
+ baseId, i, pipelines.indexOf(p), j));
+ }
+ }
+ }
+ microOpStats.put(fact, makeStatForRuntimeFact(fact, baseName, baseId, i, -1, -1));
+ }
+ for (SubplanRuntimeFactory sub : subplans) {
+ sub.setStats(microOpStats);
+ }
+ return microOpStats;
+ }
+
private class SourcePushRuntime extends AbstractUnaryOutputSourceOperatorNodePushable {
private final IHyracksTaskContext ctx;
@@ -99,7 +169,7 @@
outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
PipelineAssembler pa =
new PipelineAssembler(pipeline, inputArity, outputArity, null, pipelineOutputRecordDescriptor);
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx, new HashMap<>());
HyracksDataException exception = null;
try {
startOfPipeline.open();
@@ -126,16 +196,18 @@
public String getDisplayName() {
return "Empty Tuple Source";
}
+
}
private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ return new AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable() {
private IFrameWriter startOfPipeline;
private boolean opened = false;
+ private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
+ private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>();
- @Override
public void open() throws HyracksDataException {
if (startOfPipeline == null) {
RecordDescriptor pipelineOutputRecordDescriptor =
@@ -144,7 +216,7 @@
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- startOfPipeline = pa.assemblePipeline(writer, ctx);
+ startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats);
}
opened = true;
startOfPipeline.open();
@@ -175,9 +247,38 @@
}
@Override
+ public void deinitialize() throws HyracksDataException {
+ super.deinitialize();
+ }
+
+ @Override
public String toString() {
return AlgebricksMetaOperatorDescriptor.this.toString();
}
+
+ @Override
+ public void addStats(IOperatorStats stats) throws HyracksDataException {
+ microOpStats = makeMicroOpStats(pipeline, stats);
+ for (IOperatorStats stat : microOpStats.values()) {
+ ctx.getStatsCollector().add(stat);
+ }
+ }
+
+ @Override
+ public void setUpstreamStats(IOperatorStats stats) {
+ parentStats = stats;
+ }
+
+ @Override
+ public long getTotalTime() {
+ return startOfPipeline instanceof ITimedWriter ? ((ITimedWriter) startOfPipeline).getTotalTime() : 0;
+ }
+
+ @Override
+ public IOperatorStats getStats() {
+ IPushRuntimeFactory[] facts = pipeline.getRuntimeFactories();
+ return microOpStats.getOrDefault(facts[facts.length - 1], NoOpOperatorStats.INSTANCE);
+ }
};
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 858fcfa..202c087 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -25,6 +25,8 @@
import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -32,6 +34,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
public class PipelineAssembler {
@@ -55,6 +58,11 @@
}
public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+ return assemblePipeline(writer, ctx, new HashMap<>());
+ }
+
+ public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx,
+ Map<IPushRuntimeFactory, IOperatorStats> microOpStats) throws HyracksDataException {
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
@@ -67,7 +75,21 @@
IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
for (int j = 0; j < newRuntimes.length; j++) {
- if (enforce) {
+ //ETS is wrapped externally, and doesn't need the micro-op wrapper since it isn't a pipeline
+ //we also want to avoid any instances of NoOp stats in the pipeline that snuck in somehow
+ boolean shouldProfile = profile && !(runtimeFactory instanceof EmptyTupleSourceRuntimeFactory)
+ && microOpStats.containsKey(runtimeFactory);
+ if (shouldProfile) {
+ ProfiledPushRuntime profiled;
+ if (j == 0) {
+ profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+ microOpStats.get(runtimeFactory));
+ } else {
+ profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j],
+ microOpStats.get(runtimeFactory), false);
+ }
+ newRuntimes[j] = profiled;
+ } else if (enforce && !profile) {
newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]);
}
if (i == runtimeFactories.length - 1) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 349e50f..7feca3c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,7 +20,9 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
@@ -32,16 +34,19 @@
import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ProfiledFrameWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class SubplanRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 3L;
private final List<AlgebricksPipeline> pipelines;
@@ -51,6 +56,8 @@
private final IMissingWriterFactory[] missingWriterFactories;
+ private final Map<IPushRuntimeFactory, IOperatorStats> stats;
+
public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, IMissingWriterFactory[] missingWriterFactories,
RecordDescriptor inputRecordDesc, RecordDescriptor outputRecordDesc, int[] projectionList) {
super(projectionList);
@@ -61,6 +68,7 @@
if (projectionList != null) {
throw new NotImplementedException();
}
+ this.stats = new HashMap<>();
}
@Override
@@ -78,6 +86,14 @@
return sb.toString();
}
+ public List<AlgebricksPipeline> getPipelines() {
+ return pipelines;
+ }
+
+ public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
+ this.stats.putAll(stats);
+ }
+
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
@@ -92,8 +108,11 @@
boolean first;
+ boolean profile;
+
SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
this.ctx = ctx;
+ this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
this.first = true;
IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length];
@@ -114,6 +133,11 @@
if (i == 0) {
// primary pipeline
outputWriter = new TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+ //this is necessary to track the output of the last operator to the outer product,
+ //i.e. the last real operator in pipeline 0 of the subplan
+ if (profile) {
+ outputWriter = new ProfiledFrameWriter(outputWriter);
+ }
outputRecordDescriptor = SubplanRuntimeFactory.this.outputRecordDesc;
} else {
// secondary pipeline
@@ -127,7 +151,8 @@
}
PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
- startOfPipelines[i] = (NestedTupleSourceRuntime) pa.assemblePipeline(outputWriter, ctx);
+ IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
+ startOfPipelines[i] = (NestedTupleSourceRuntime) head;
pipelineAssemblers[i] = pa;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
new file mode 100644
index 0000000..7402179
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ISelfProfilingNodePushable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ISelfProfilingNodePushable extends IStatsContainingNodePushable {
+ void addStats(IOperatorStats stats) throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
new file mode 100644
index 0000000..96a3ae9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IStatsContainingNodePushable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface IStatsContainingNodePushable {
+ IOperatorStats getStats();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
new file mode 100644
index 0000000..7b0f8c8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ITimedWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+
+public interface ITimedWriter extends IFrameWriter {
+ void setUpstreamStats(IOperatorStats stats);
+
+ long getTotalTime();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index dc53bca..cfb0e7b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.dataflow;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.nio.ByteBuffer;
import org.apache.hyracks.api.com.job.profiling.counters.Counter;
@@ -28,30 +30,33 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
import org.apache.hyracks.api.job.profiling.OperatorStats;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.util.HyracksRunnable;
import org.apache.hyracks.api.util.HyracksThrowingConsumer;
import org.apache.hyracks.util.IntSerDeUtils;
-public class ProfiledFrameWriter implements IFrameWriter {
+public class ProfiledFrameWriter implements ITimedWriter {
// The downstream data consumer of this writer.
private final IFrameWriter writer;
- private final ICounter tupleCounter;
- private final IOperatorStats parentStats;
+ protected IOperatorStats upstreamStats = NoOpOperatorStats.INSTANCE;
private int minSz = Integer.MAX_VALUE;
private int maxSz = -1;
private long avgSz;
private ICounter totalTime;
- public ProfiledFrameWriter(IFrameWriter writer, IOperatorStats parentStats) {
+ public ProfiledFrameWriter(IFrameWriter writer) {
this.writer = writer;
- this.parentStats = parentStats;
- this.tupleCounter = parentStats != null ? parentStats.getTupleCounter() : null;
this.totalTime = new Counter("totalTime");
}
+ @Override
+ public void setUpstreamStats(IOperatorStats stats) {
+ this.upstreamStats = stats;
+ }
+
public static void timeMethod(HyracksRunnable r, ICounter c) throws HyracksDataException {
long nt = 0;
try {
@@ -80,25 +85,24 @@
private void updateTupleStats(ByteBuffer buffer) {
int tupleCountOffset = FrameHelper.getTupleCountOffset(buffer.limit());
int tupleCount = IntSerDeUtils.getInt(buffer.array(), tupleCountOffset);
- if (tupleCounter != null) {
- long prevCount = tupleCounter.get();
- for (int i = 0; i < tupleCount; i++) {
- int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
- if (maxSz < tupleLen) {
- maxSz = tupleLen;
- }
- if (minSz > tupleLen) {
- minSz = tupleLen;
- }
- long prev = avgSz * prevCount;
- avgSz = (prev + tupleLen) / (prevCount + 1);
- prevCount++;
+ ICounter tupleCounter = upstreamStats.getTupleCounter();
+ long prevCount = tupleCounter.get();
+ for (int i = 0; i < tupleCount; i++) {
+ int tupleLen = getTupleLength(i, tupleCountOffset, buffer);
+ if (maxSz < tupleLen) {
+ maxSz = tupleLen;
}
- parentStats.getMaxTupleSz().set(maxSz);
- parentStats.getMinTupleSz().set(minSz);
- parentStats.getAverageTupleSz().set(avgSz);
- tupleCounter.update(tupleCount);
+ if (minSz > tupleLen) {
+ minSz = tupleLen;
+ }
+ long prev = avgSz * prevCount;
+ avgSz = (prev + tupleLen) / (prevCount + 1);
+ prevCount++;
}
+ upstreamStats.getMaxTupleSz().set(maxSz);
+ upstreamStats.getMinTupleSz().set(minSz);
+ upstreamStats.getAverageTupleSz().set(avgSz);
+ tupleCounter.update(tupleCount);
}
@Override
@@ -140,14 +144,15 @@
throws HyracksDataException {
if (!(writer instanceof ProfiledFrameWriter)) {
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name);
+ IOperatorStats stats = new OperatorStats(name, INVALID_ODID);
statsCollector.add(stats);
- return new ProfiledFrameWriter(writer, null);
-
- } else
+ return new ProfiledFrameWriter(writer);
+ } else {
return writer;
+ }
}
+ @Override
public long getTotalTime() {
return totalTime.get();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index bde5611..1984d8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -32,20 +32,16 @@
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
-public class ProfiledOperatorNodePushable implements IOperatorNodePushable {
+public class ProfiledOperatorNodePushable implements IOperatorNodePushable, IStatsContainingNodePushable {
private final IOperatorNodePushable op;
- private final Map<Integer, ProfiledFrameWriter> inputs;
- private final Map<Integer, ProfiledOperatorNodePushable> parents;
- private final Map<Integer, ProfiledFrameWriter> outputs;
+ private final Map<Integer, ITimedWriter> inputs;
+ private final Map<Integer, ITimedWriter> outputs;
private final IOperatorStats stats;
private final ICounter totalTime;
- ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats,
- ProfiledOperatorNodePushable parentOp) {
+ ProfiledOperatorNodePushable(IOperatorNodePushable op, IOperatorStats stats) {
this.stats = stats;
- this.parents = new HashMap<>();
- parents.put(0, parentOp);
this.op = op;
inputs = new HashMap<>();
outputs = new HashMap<>();
@@ -60,10 +56,10 @@
@Override
public void deinitialize() throws HyracksDataException {
long ownTime = totalTime.get();
- for (ProfiledFrameWriter i : inputs.values()) {
+ for (ITimedWriter i : inputs.values()) {
ownTime += i.getTotalTime();
}
- for (ProfiledFrameWriter w : outputs.values()) {
+ for (ITimedWriter w : outputs.values()) {
ownTime -= w.getTotalTime();
}
op.deinitialize();
@@ -78,8 +74,13 @@
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
throws HyracksDataException {
- if (writer instanceof ProfiledFrameWriter) {
- ProfiledFrameWriter wrapper = (ProfiledFrameWriter) writer;
+ if (writer instanceof ITimedWriter) {
+ ITimedWriter wrapper = (ITimedWriter) writer;
+ if (op instanceof ISelfProfilingNodePushable) {
+ wrapper.setUpstreamStats(((ISelfProfilingNodePushable) op).getStats());
+ } else {
+ wrapper.setUpstreamStats(stats);
+ }
outputs.put(index, wrapper);
}
op.setOutputFrameWriter(index, writer, recordDesc);
@@ -88,8 +89,7 @@
@Override
public IFrameWriter getInputFrameWriter(int index) {
if (inputs.get(index) == null) {
- IOperatorStats parentStats = parents.get(index) == null ? null : parents.get(index).getStats();
- ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index), parentStats);
+ ProfiledFrameWriter pfw = new ProfiledFrameWriter(op.getInputFrameWriter(index));
inputs.put(index, pfw);
return pfw;
} else {
@@ -102,25 +102,26 @@
return op.getDisplayName();
}
- public void addParent(int index, ProfiledOperatorNodePushable parent) {
- parents.put(index, parent);
- }
-
public IOperatorStats getStats() {
return stats;
}
- public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId,
- ProfiledOperatorNodePushable source) throws HyracksDataException {
+ public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx, ActivityId acId)
+ throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
- statsCollector.add(stats);
+ IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
+ if (!(op instanceof ISelfProfilingNodePushable)) {
+ statsCollector.add(stats);
+ }
if (op instanceof IIntrospectingOperator) {
((IIntrospectingOperator) op).setOperatorStats(stats);
}
+ if (op instanceof ISelfProfilingNodePushable) {
+ ((ISelfProfilingNodePushable) op).addStats(stats);
+ }
if (!(op instanceof ProfiledOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
- return new ProfiledOperatorNodePushable(op, stats, source);
+ return new ProfiledOperatorNodePushable(op, stats);
}
return op;
}
@@ -129,7 +130,7 @@
throws HyracksDataException {
String name = acId.toString() + " - " + op.getDisplayName();
IStatsCollector statsCollector = ctx.getStatsCollector();
- IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId());
+ IOperatorStats stats = new OperatorStats(name, acId.getOperatorDescriptorId().toString());
if (op instanceof IIntrospectingOperator) {
((IIntrospectingOperator) op).setOperatorStats(stats);
statsCollector.add(stats);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 26b3a58..7770c4f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -18,10 +18,11 @@
*/
package org.apache.hyracks.api.job.profiling;
+import java.io.DataInput;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.io.IWritable;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -86,11 +87,22 @@
ICounter getBytesWritten();
- OperatorDescriptorId getId();
+ String getOperatorId();
void updateIndexesStats(Map<String, IndexStats> indexesStats);
Map<String, IndexStats> getIndexesStats();
void updateFrom(IOperatorStats stats);
+
+ static IOperatorStats create(DataInput input) throws IOException {
+ String name = input.readUTF();
+ if (NoOpOperatorStats.NOOP_NAME.equals(name)) {
+ return NoOpOperatorStats.INSTANCE;
+ }
+ String operatorId = input.readUTF();
+ OperatorStats operatorStats = new OperatorStats(name, operatorId);
+ operatorStats.readFields(input);
+ return operatorStats;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
index 4c86a15..d427d14 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
@@ -24,14 +24,15 @@
import java.util.Collections;
import java.util.Map;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
public class NoOpOperatorStats implements IOperatorStats {
- private static final long serialVersionUID = 9055940822300360135L;
+ private static final long serialVersionUID = 9055940222300360256L;
public static final NoOpOperatorStats INSTANCE = new NoOpOperatorStats();
+ public static final String INVALID_ODID = "ODID:-1";
+ public static final String NOOP_NAME = "NoOp";
private static final ICounter NOOP_COUNTER = new ICounter() {
private static final long serialVersionUID = 1L;
@@ -57,21 +58,20 @@
}
};
- private static final OperatorDescriptorId INVALID_ODID = new OperatorDescriptorId(-1);
-
@Override
public void writeFields(DataOutput output) throws IOException {
-
+ output.writeUTF(NOOP_NAME);
+ output.writeUTF(INVALID_ODID);
}
@Override
public void readFields(DataInput input) throws IOException {
-
+ // nothing
}
@Override
public String getName() {
- return "";
+ return NOOP_NAME;
}
@Override
@@ -130,7 +130,7 @@
}
@Override
- public OperatorDescriptorId getId() {
+ public String getOperatorId() {
return INVALID_ODID;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index c9f08e0..412b788 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -25,14 +25,13 @@
import java.util.Map;
import org.apache.hyracks.api.com.job.profiling.counters.Counter;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
public class OperatorStats implements IOperatorStats {
- private static final long serialVersionUID = 6401830963367567169L;
-
+ private static final long serialVersionUID = 6401830963361567126L;
public final String operatorName;
- public OperatorDescriptorId id;
+
+ public final String operatorId;
public final ICounter tupleCounter;
public final ICounter timeCounter;
public final ICounter pageReads;
@@ -48,12 +47,12 @@
//TODO: this is quickly becoming gross it should just be a map where the value is obliged to be a Counter
- public OperatorStats(String operatorName, OperatorDescriptorId id) {
+ public OperatorStats(String operatorName, String operatorId) {
if (operatorName == null || operatorName.isEmpty()) {
throw new IllegalArgumentException("operatorName must not be null or empty");
}
this.operatorName = operatorName;
- this.id = id;
+ this.operatorId = operatorId;
tupleCounter = new Counter("tupleCounter");
timeCounter = new Counter("timeCounter");
pageReads = new Counter("diskIoCounter");
@@ -69,17 +68,6 @@
indexesStats = new HashMap<>();
}
- public OperatorStats(String operatorName) {
- this(operatorName, new OperatorDescriptorId(-1));
- }
-
- public static IOperatorStats create(DataInput input) throws IOException {
- String name = input.readUTF();
- OperatorStats operatorStats = new OperatorStats(name);
- operatorStats.readFields(input);
- return operatorStats;
- }
-
@Override
public String getName() {
return operatorName;
@@ -141,8 +129,8 @@
}
@Override
- public OperatorDescriptorId getId() {
- return id;
+ public String getOperatorId() {
+ return operatorId;
}
@Override
@@ -178,7 +166,7 @@
@Override
public void writeFields(DataOutput output) throws IOException {
output.writeUTF(operatorName);
- id.writeFields(output);
+ output.writeUTF(operatorId);
output.writeLong(tupleCounter.get());
output.writeLong(timeCounter.get());
output.writeLong(pageReads.get());
@@ -195,7 +183,6 @@
@Override
public void readFields(DataInput input) throws IOException {
- id = OperatorDescriptorId.create(input);
tupleCounter.set(input.readLong());
timeCounter.set(input.readLong());
pageReads.set(input.readLong());
@@ -229,13 +216,13 @@
@Override
public String toString() {
- return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
- + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \""
- + coldReadCounter.getName() + "\": " + coldReadCounter.get() + avgTupleSz.getName() + "\": "
- + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + minTupleSz.get() + ", \""
- + minTupleSz.getName() + "\": " + timeCounter.get() + ", \"" + inputTupleCounter.getName() + "\": "
- + bytesRead.get() + ", \"" + bytesRead.getName() + "\": " + bytesWritten.get() + ", \""
- + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \"" + level.getName() + "\": "
- + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
+ return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"id\": \"" + operatorId + "\", " + "\""
+ + tupleCounter.getName() + "\": " + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": "
+ + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get()
+ + avgTupleSz.getName() + "\": " + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": "
+ + minTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + timeCounter.get() + ", \""
+ + inputTupleCounter.getName() + "\": " + bytesRead.get() + ", \"" + bytesRead.getName() + "\": "
+ + bytesWritten.get() + ", \"" + bytesWritten.getName() + "\": " + inputTupleCounter.get() + ", \""
+ + level.getName() + "\": " + level.get() + ", \"indexStats\": \"" + indexesStats + "\" }";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 4261c1e..17f5cb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -109,11 +109,11 @@
*/
Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>();
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable = null;
+ IOperatorNodePushable opPushable;
if (profile) {
IOperatorNodePushable wrapped =
entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey(), null);
+ opPushable = ProfiledOperatorNodePushable.time(wrapped, ctx, entry.getKey());
} else {
opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
ProfiledOperatorNodePushable.onlyAddStats(opPushable, ctx, entry.getKey());
@@ -147,12 +147,7 @@
if (profile) {
IOperatorNodePushable wrapped = channel.getRight().getLeft().createPushRuntime(ctx,
recordDescProvider, partition, nPartitions);
- if (sourceOp instanceof ProfiledOperatorNodePushable) {
- destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId,
- (ProfiledOperatorNodePushable) sourceOp);
- } else {
- destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId, null);
- }
+ destOp = ProfiledOperatorNodePushable.time(wrapped, ctx, destId);
} else {
destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
nPartitions);
@@ -160,12 +155,6 @@
}
operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
- } else if (profile) {
- if (destOp instanceof ProfiledOperatorNodePushable
- && sourceOp instanceof ProfiledOperatorNodePushable) {
- ((ProfiledOperatorNodePushable) destOp).addParent(inputChannel,
- (ProfiledOperatorNodePushable) sourceOp);
- }
}
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 26c664a..7cfbda0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -31,7 +33,7 @@
import org.apache.hyracks.api.job.profiling.OperatorStats;
public class StatsCollector implements IStatsCollector {
- private static final long serialVersionUID = 6858817639895434574L;
+ private static final long serialVersionUID = 6858817639895434379L;
private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
@@ -61,7 +63,7 @@
@Override
public IOperatorStats getAggregatedStats() {
- IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+ IOperatorStats aggregatedStats = new OperatorStats("aggregated", INVALID_ODID);
for (IOperatorStats stats : operatorStatsMap.values()) {
aggregatedStats.getInputTupleCounter().update(stats.getInputTupleCounter().get());
aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get());
@@ -74,8 +76,8 @@
@Override
public void writeFields(DataOutput output) throws IOException {
output.writeInt(operatorStatsMap.size());
- for (IOperatorStats operatorStats : operatorStatsMap.values()) {
- operatorStats.writeFields(output);
+ for (IOperatorStats stats : operatorStatsMap.values()) {
+ stats.writeFields(output);
}
}
@@ -83,7 +85,7 @@
public void readFields(DataInput input) throws IOException {
int operatorCount = input.readInt();
for (int i = 0; i < operatorCount; i++) {
- IOperatorStats opStats = OperatorStats.create(input);
+ IOperatorStats opStats = IOperatorStats.create(input);
operatorStatsMap.put(opStats.getName(), opStats);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 871d2bc..40bc1ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling.om;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -184,7 +186,7 @@
}
IOperatorStats opOutStats = outStats[i];
if (opOutStats == null) {
- opOutStats = new OperatorStats(operatorName);
+ opOutStats = new OperatorStats(operatorName, INVALID_ODID);
outStats[i] = opOutStats;
}
opOutStats.updateFrom(opTaskStats);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 546360a..84376f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.job.profiling.om;
+import static org.apache.hyracks.api.job.profiling.NoOpOperatorStats.INVALID_ODID;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -130,8 +132,8 @@
jpe.put("name", key);
jpe.put("run-time", Double
.parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
- if (value.getId().getId() >= 0) {
- jpe.put("runtime-id", value.getId().toString());
+ if (!value.getOperatorId().equals(INVALID_ODID)) {
+ jpe.put("runtime-id", value.getOperatorId());
}
if (value.getPageReads().get() > 0) {
jpe.put("pages-read", value.getPageReads().get());
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
new file mode 100644
index 0000000..7f0f5a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.ISelfProfilingNodePushable;
+import org.apache.hyracks.api.dataflow.ITimedWriter;
+
+public abstract class AbstractUnaryInputUnaryOutputIntrospectingOperatorNodePushable
+ extends AbstractUnaryInputUnaryOutputOperatorNodePushable implements ISelfProfilingNodePushable, ITimedWriter {
+
+}