[NO ISSUE][RT] Add Thread-Based Stats Collector
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add infra to allow collecting thread-based stats during
runtime for any thread that belongs to a task.
- Collect number of pinned pages per thread and report it
in the TaskProfile.
- Aggregate pinned pages counters from all job tasks and
report it as diskIoCount in the metrics field in the json
response. The plan is to move this stats to the profile
field when it is introduced.
- Collecting pinned pages stats is currently enabled by
default for any job with IndexSearchOperatorNodePushable.
The plan is to allow enabling/disabling as part of the
job profiling change.
- Add test case for diskIoCount metric.
- Remove unused IndexSearchOperatorNodePushable constructor.
Change-Id: I44dfcedcadb3d0f48815b521e7d495e473b02e3d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3555
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 64a4452..d7281e3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -83,6 +83,7 @@
private long count;
private long size;
private long processedObjects;
+ private long diskIoCount;
public long getCount() {
return count;
@@ -107,6 +108,14 @@
public void setProcessedObjects(long processedObjects) {
this.processedObjects = processedObjects;
}
+
+ public long getDiskIoCount() {
+ return diskIoCount;
+ }
+
+ public void setDiskIoCount(long diskIoCount) {
+ this.diskIoCount = diskIoCount;
+ }
}
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
index 6fb37d3..3ce09a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -29,6 +29,7 @@
private final SessionConfig.OutputFormat format;
private long jobDuration;
private long processedObjects;
+ private long diskIoCount;
private Set<Warning> warnings;
public ResultMetadata(SessionConfig.OutputFormat format) {
@@ -63,9 +64,17 @@
return warnings;
}
+ public void setDiskIoCount(long diskIoCount) {
+ this.diskIoCount = diskIoCount;
+ }
+
+ public long getDiskIoCount() {
+ return diskIoCount;
+ }
+
@Override
public String toString() {
return "ResultMetadata{" + "format=" + format + ", jobDuration=" + jobDuration + ", processedObjects="
- + processedObjects + '}';
+ + processedObjects + ", diskIoCount=" + diskIoCount + '}';
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 2b06dbb..a39c716 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -98,9 +98,9 @@
printer.begin();
printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, stats, sessionOutput));
printer.printResults();
- ResponseMertics mertics =
- ResponseMertics.of(System.nanoTime() - elapsedStart, metadata.getJobDuration(),
- stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0, 0);
+ ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart,
+ metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
+ 0, metadata.getDiskIoCount());
printer.addFooterPrinter(new MetricsPrinter(mertics, HttpUtil.getPreferredCharset(request)));
printer.printFooters();
printer.end();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 31f0f2b..c9aeabc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -563,7 +563,8 @@
responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus()));
}
final ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, execution.duration(),
- stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount, warnings.size());
+ stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount, warnings.size(),
+ stats.getDiskIoCount());
responsePrinter.addFooterPrinter(new MetricsPrinter(mertics, resultCharset));
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index ab17cdd..b152fc9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -68,6 +68,7 @@
private void aggregateJobStats(JobId jobId, ResultMetadata metadata) {
long processedObjects = 0;
+ long diskIoCount = 0;
Set<Warning> warnings = new HashSet<>();
IJobManager jobManager =
((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
@@ -79,11 +80,13 @@
final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
for (TaskProfile tp : jobletTasksProfile) {
processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+ diskIoCount += tp.getStatsCollector().getAggregatedStats().getDiskIoCounter().get();
warnings.addAll(tp.getWarnings());
}
}
}
metadata.setProcessedObjects(processedObjects);
metadata.setWarnings(warnings);
+ metadata.setDiskIoCount(diskIoCount);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
index 666b759..03a20a5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
@@ -27,12 +27,13 @@
private long processedObjects;
private long errorCount;
private long warnCount;
+ private long diskIoCount;
private ResponseMertics() {
}
public static ResponseMertics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
- long processedObjects, long errorCount, long warnCount) {
+ long processedObjects, long errorCount, long warnCount, long diskIoCount) {
ResponseMertics mertics = new ResponseMertics();
mertics.elapsedTime = elapsedTime;
mertics.executionTime = executionTime;
@@ -41,6 +42,7 @@
mertics.processedObjects = processedObjects;
mertics.errorCount = errorCount;
mertics.warnCount = warnCount;
+ mertics.diskIoCount = diskIoCount;
return mertics;
}
@@ -71,4 +73,8 @@
public long getWarnCount() {
return warnCount;
}
+
+ public long getDiskIoCount() {
+ return diskIoCount;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
index 35b5f43e..117441e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
@@ -36,6 +36,7 @@
RESULT_SIZE("resultSize"),
ERROR_COUNT("errorCount"),
PROCESSED_OBJECTS_COUNT("processedObjects"),
+ DISK_IO_COUNT("diskIoCount"),
WARNING_COUNT("warningCount");
private final String str;
@@ -77,9 +78,16 @@
pw.print("\n\t");
final boolean hasErrors = mertics.getErrorCount() > 0;
final boolean hasWarnings = mertics.getWarnCount() > 0;
+ final boolean hasDiskIoStats = mertics.getDiskIoCount() > 0;
ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), mertics.getProcessedObjects(),
- hasWarnings || hasErrors);
+ hasWarnings || hasErrors || hasDiskIoStats);
pw.print("\n");
+ //TODO move diskIoCount to the profile printer when it is introduced
+ if (hasDiskIoStats) {
+ pw.print("\t");
+ ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), mertics.getDiskIoCount(), hasWarnings || hasErrors);
+ pw.print("\n");
+ }
if (hasWarnings) {
pw.print("\t");
ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), mertics.getWarnCount(), hasErrors);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
index 000dd6a..531e39e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
@@ -62,6 +62,7 @@
List<Triple<JobId, ResultSetId, ARecordType>> resultSets = resultMetadata.getResultSets();
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultSets.isEmpty()) {
stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
+ stats.setDiskIoCount(responseMsg.getStats().getDiskIoCount());
for (int i = 0; i < resultSets.size(); i++) {
Triple<JobId, ResultSetId, ARecordType> rsmd = resultSets.get(i);
ResultReader resultReader = new ResultReader(resultSet, rsmd.getLeft(), rsmd.getMiddle());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a59adb7..74997f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2549,6 +2549,7 @@
(org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
.getResultMetadata(jobId, rsId);
stats.setProcessedObjects(resultMetadata.getProcessedObjects());
+ stats.setDiskIoCount(resultMetadata.getDiskIoCount());
warningCollector.warn(resultMetadata.getWarnings());
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
index da7ba31..81093a3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -40,5 +40,10 @@
<output-dir compare="Text">secondary-index-index-only</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="metrics">
+ <compilation-unit name="disk-io-count">
+ <output-dir compare="Text">disk-io-count</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp
new file mode 100644
index 0000000..3fb3ceb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Disk io count metrics on full scan
+ * Expected Res : Success
+ * Date : 10 Sep 2019
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp
new file mode 100644
index 0000000..348a66e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Disk io count metrics on full scan
+ * Expected Res : Success
+ * Date : 10 Sep 2019
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp
new file mode 100644
index 0000000..7561a2c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Disk io count metrics on full scan
+ * Expected Res : Success
+ * Date : 10 Sep 2019
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp
new file mode 100644
index 0000000..01c285e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Disk io count metrics on full scan
+ * Expected Res : Success
+ * Date : 10 Sep 2019
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm
new file mode 100644
index 0000000..7eb0b1d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm
@@ -0,0 +1 @@
+.*"diskIoCount":[0-9]+.*
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 1ffcf9f..4561af1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -33,6 +33,8 @@
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.util.IThreadStats;
+import org.apache.hyracks.util.IThreadStatsCollector;
public interface IHyracksTaskContext
extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry, IOperatorEnvironment {
@@ -61,4 +63,23 @@
IStatsCollector getStatsCollector();
IWarningCollector getWarningCollector();
+
+ /**
+ * Subscribes the caller thread to {@code threadStatsCollector}
+ *
+ * @param threadStatsCollector
+ */
+ void subscribeThreadToStats(IThreadStatsCollector threadStatsCollector);
+
+ /**
+ * Unsubscribes the caller thread from all thread stats collectors known to this task
+ */
+ void unsubscribeThreadFromStats();
+
+ /**
+ * Gets the caller thread's stats
+ *
+ * @return the thread stats
+ */
+ IThreadStats getThreadStats();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 35ae8b6..181249a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -41,4 +41,9 @@
* of an operator
*/
ICounter getTimeCounter();
+
+ /**
+ * @return A counter used to track the number of pages pinned by an opeartor
+ */
+ ICounter getDiskIoCounter();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 46a1dec..12d696e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -214,6 +214,7 @@
failures.offer(th);
throw th;
} finally {
+ ctx.unsubscribeThreadFromStats();
completeSemaphore.release();
}
return null;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
index 185e197..66fb797 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
@@ -32,6 +32,7 @@
public final String operatorName;
public final ICounter tupleCounter;
public final ICounter timeCounter;
+ public final ICounter diskIoCounter;
public OperatorStats(String operatorName) {
if (operatorName == null || operatorName.isEmpty()) {
@@ -40,6 +41,7 @@
this.operatorName = operatorName;
tupleCounter = new Counter("tupleCounter");
timeCounter = new Counter("timeCounter");
+ diskIoCounter = new Counter("diskIoCounter");
}
public static IOperatorStats create(DataInput input) throws IOException {
@@ -65,15 +67,22 @@
}
@Override
+ public ICounter getDiskIoCounter() {
+ return diskIoCounter;
+ }
+
+ @Override
public void writeFields(DataOutput output) throws IOException {
output.writeUTF(operatorName);
output.writeLong(tupleCounter.get());
output.writeLong(timeCounter.get());
+ output.writeLong(diskIoCounter.get());
}
@Override
public void readFields(DataInput input) throws IOException {
tupleCounter.set(input.readLong());
timeCounter.set(input.readLong());
+ diskIoCounter.set(input.readLong());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index e95c107..8da19ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -57,7 +57,8 @@
IOperatorStats aggregatedStats = new OperatorStats("aggregated");
for (IOperatorStats stats : operatorStatsMap.values()) {
aggregatedStats.getTupleCounter().update(stats.getTupleCounter().get());
- aggregatedStats.getTimeCounter().update(stats.getTupleCounter().get());
+ aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get());
+ aggregatedStats.getDiskIoCounter().update(stats.getDiskIoCounter().get());
}
return aggregatedStats;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 5bb713d..06f3aa9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Hashtable;
import java.util.LinkedHashSet;
import java.util.List;
@@ -71,6 +72,9 @@
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
import org.apache.hyracks.control.nc.work.NotifyTaskCompleteWork;
import org.apache.hyracks.control.nc.work.NotifyTaskFailureWork;
+import org.apache.hyracks.util.IThreadStats;
+import org.apache.hyracks.util.IThreadStatsCollector;
+import org.apache.hyracks.util.ThreadStats;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -122,6 +126,10 @@
private final IWarningCollector warningCollector;
+ private final Set<IThreadStatsCollector> threadStatsCollectors = new HashSet<>();
+
+ private final Map<Long, IThreadStats> perThreadStats = new HashMap<>();
+
public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
ExecutorService executor, NodeControllerService ncs,
List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -202,6 +210,7 @@
public void close() {
deallocatableRegistry.close();
+ threadStatsCollectors.forEach(IThreadStatsCollector::unsubscribe);
}
@Override
@@ -335,6 +344,7 @@
removePendingThread(thread);
}
} finally {
+ unsubscribeThreadFromStats();
sem.release();
}
});
@@ -342,6 +352,7 @@
try {
pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
} finally {
+ unsubscribeThreadFromStats();
sem.acquireUninterruptibly(collectors.length - 1);
}
}
@@ -486,6 +497,31 @@
return warningCollector;
}
+ @Override
+ public IThreadStats getThreadStats() {
+ synchronized (threadStatsCollectors) {
+ return perThreadStats.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ThreadStats());
+ }
+ }
+
+ @Override
+ public synchronized void subscribeThreadToStats(IThreadStatsCollector threadStatsCollector) {
+ //TODO do this only when profiling is enabled
+ synchronized (threadStatsCollectors) {
+ threadStatsCollectors.add(threadStatsCollector);
+ final long threadId = Thread.currentThread().getId();
+ IThreadStats threadStat = perThreadStats.computeIfAbsent(threadId, id -> new ThreadStats());
+ threadStatsCollector.subscribe(threadStat);
+ }
+ }
+
+ @Override
+ public synchronized void unsubscribeThreadFromStats() {
+ synchronized (threadStatsCollectors) {
+ threadStatsCollectors.forEach(IThreadStatsCollector::unsubscribe);
+ }
+ }
+
public boolean isCompleted() {
return completed;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 2db0ec0..d504a96 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.IThreadStatsCollector;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -123,16 +124,6 @@
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
- ITupleFilterFactory tupleFilterFactory, long outputLimit) throws HyracksDataException {
- this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
- retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
- tupleFilterFactory, outputLimit, false, null, null);
- }
-
- public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
- int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
- boolean retainInput, boolean retainMissing, IMissingWriterFactory missingWriterFactory,
- ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
ITupleFilterFactory tupleFactoryFactory, long outputLimit, boolean appendSearchCallbackProceedResult,
byte[] searchCallbackProceedResultFalseValue, byte[] searchCallbackProceedResultTrueValue)
throws HyracksDataException {
@@ -189,6 +180,7 @@
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
+ subscribeForStats(index);
accessor = new FrameTupleAccessor(inputRecDesc);
if (retainMissing) {
int fieldCount = getFieldCount();
@@ -317,6 +309,7 @@
if (appender.getTupleCount() > 0) {
appender.write(writer, true);
}
+ stats.getDiskIoCounter().update(ctx.getThreadStats().getPinnedPagesCount());
} catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
// subsequently, the failure will be thrown
failure = th;
@@ -343,6 +336,12 @@
writer.fail();
}
+ private void subscribeForStats(IIndex index) {
+ if (index.getBufferCache() instanceof IThreadStatsCollector) {
+ ctx.subscribeThreadToStats((IThreadStatsCollector) index.getBufferCache());
+ }
+ }
+
private void writeTupleToOutput(ITupleReference tuple) throws IOException {
try {
for (int i = 0; i < tuple.getFieldCount(); i++) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index c4c5d8a..55875a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -47,11 +47,13 @@
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.IThreadStats;
+import org.apache.hyracks.util.IThreadStatsCollector;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
+public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent, IThreadStatsCollector {
private static final Logger LOGGER = LogManager.getLogger();
private static final int MAP_FACTOR = 3;
@@ -79,6 +81,7 @@
private IIOReplicationManager ioReplicationManager;
private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
private final AtomicLong masterPinCount = new AtomicLong();
+ private final Map<Long, IThreadStats> statsSubscribers = new ConcurrentHashMap<>();
private boolean closed;
@@ -170,6 +173,10 @@
if (DEBUG) {
pinSanityCheck(dpid);
}
+ final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread().getId());
+ if (threadStats != null) {
+ threadStats.pagePinned();
+ }
CachedPage cPage = findPage(dpid);
if (!newPage) {
if (DEBUG) {
@@ -578,6 +585,16 @@
}
}
+ @Override
+ public void subscribe(IThreadStats stats) {
+ statsSubscribers.put(Thread.currentThread().getId(), stats);
+ }
+
+ @Override
+ public void unsubscribe() {
+ statsSubscribers.remove(Thread.currentThread().getId());
+ }
+
private int hash(long dpid) {
int hashValue = (int) dpid ^ (Integer.reverse((int) (dpid >>> 32)) >>> 1);
return hashValue % pageMap.length;
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index f264a4b..7c602f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -42,6 +42,9 @@
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
+import org.apache.hyracks.util.IThreadStats;
+import org.apache.hyracks.util.IThreadStatsCollector;
+import org.apache.hyracks.util.ThreadStats;
public class TestTaskContext implements IHyracksTaskContext {
private final TestJobletContext jobletContext;
@@ -50,6 +53,7 @@
private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
private Object sharedObject;
private final IStatsCollector statsCollector = new StatsCollector();
+ private final ThreadStats threadStats = new ThreadStats();
public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
this.jobletContext = jobletContext;
@@ -181,4 +185,19 @@
public IWarningCollector getWarningCollector() {
return TestUtils.NOOP_WARNING_COLLECTOR;
}
+
+ @Override
+ public void subscribeThreadToStats(IThreadStatsCollector threadStatsCollector) {
+ // no op
+ }
+
+ @Override
+ public void unsubscribeThreadFromStats() {
+ // no op
+ }
+
+ @Override
+ public IThreadStats getThreadStats() {
+ return threadStats;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index aec9e35..b67d909 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/org/apache/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -107,6 +107,9 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.test.support.TestUtils;
+import org.apache.hyracks.util.IThreadStats;
+import org.apache.hyracks.util.IThreadStatsCollector;
+import org.apache.hyracks.util.ThreadStats;
@SuppressWarnings("rawtypes")
public class LSMInvertedIndexTestUtils {
@@ -649,6 +652,7 @@
// results during inverted index searches for the test purposes only.
public static class HyracksTaskTestContext implements IHyracksTaskContext {
private final int FRAME_SIZE = AccessMethodTestsConfig.LSM_INVINDEX_HYRACKS_FRAME_SIZE;
+ private final ThreadStats threadStats = new ThreadStats();
private Object sharedObject;
@Override
@@ -771,6 +775,21 @@
public IWarningCollector getWarningCollector() {
return TestUtils.NOOP_WARNING_COLLECTOR;
}
+
+ @Override
+ public void subscribeThreadToStats(IThreadStatsCollector threadStatsCollector) {
+ // no op
+ }
+
+ @Override
+ public void unsubscribeThreadFromStats() {
+ // no op
+ }
+
+ @Override
+ public IThreadStats getThreadStats() {
+ return threadStats;
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java
new file mode 100644
index 0000000..dc63ac4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+public interface IThreadStats {
+
+ /**
+ * Indicates that this thread attempted to pin a page
+ */
+ void pagePinned();
+
+ /**
+ * Gets the count of attempts made by this thread to pin a page
+ *
+ * @return the pinned pages count
+ */
+ long getPinnedPagesCount();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStatsCollector.java
new file mode 100644
index 0000000..ea82a86
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStatsCollector.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+public interface IThreadStatsCollector {
+
+ /**
+ * Subscribes this thread with this stats collector
+ *
+ * @param stats
+ */
+ void subscribe(IThreadStats stats);
+
+ /**
+ * Unsubscribes this thread from this stats collector
+ */
+ void unsubscribe();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java
new file mode 100644
index 0000000..c79eefc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class ThreadStats implements IThreadStats {
+
+ private AtomicLong pinnedPagesCount = new AtomicLong();
+
+ @Override
+ public void pagePinned() {
+ pinnedPagesCount.incrementAndGet();
+ }
+
+ @Override
+ public long getPinnedPagesCount() {
+ return pinnedPagesCount.get();
+ }
+}