[NO ISSUE][RT] Report Disk IO Per Operator

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Do not collect per thread stats when runtime
  profiling is disabled.
- Include per operator disk IO in task profile.
- Remove aggregated disk IO count from the query
  response metrics.
- Adapt test cases.

Change-Id: Iced174ae5d4fa9ca13f597325cbb66ff80819e71
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3783
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index a515def..89ffcbe 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -97,7 +97,6 @@
         private long processedObjects;
         private Profile profile;
         private ProfileType type;
-        private long diskIoCount;
         private long totalWarningsCount;
 
         public long getCount() {
@@ -124,14 +123,6 @@
             this.processedObjects = processedObjects;
         }
 
-        public long getDiskIoCount() {
-            return diskIoCount;
-        }
-
-        public void setDiskIoCount(long diskIoCount) {
-            this.diskIoCount = diskIoCount;
-        }
-
         public long getTotalWarningsCount() {
             return totalWarningsCount;
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
index 9cf8e6e..e5ed091 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -86,14 +86,6 @@
         return warnings;
     }
 
-    public void setDiskIoCount(long diskIoCount) {
-        this.diskIoCount = diskIoCount;
-    }
-
-    public long getDiskIoCount() {
-        return diskIoCount;
-    }
-
     /**
      * @return Total count of all warnings generated including unreported ones.
      */
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 69d5c70..acd6784 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -179,7 +179,6 @@
         IStatementExecutor.Stats responseStats = responseMsg.getStats();
         stats.setJobProfile(responseStats.getJobProfile());
         stats.setProcessedObjects(responseStats.getProcessedObjects());
-        stats.setDiskIoCount(responseStats.getDiskIoCount());
         stats.updateTotalWarningsCount(responseStats.getTotalWarningsCount());
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 4980f5e..5acdf3b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -101,7 +101,7 @@
                 printer.printResults();
                 ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart,
                         metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
-                        metadata.getTotalWarningsCount(), metadata.getDiskIoCount());
+                        metadata.getTotalWarningsCount());
                 printer.addFooterPrinter(new MetricsPrinter(metrics, HttpUtil.getPreferredCharset(request)));
                 if (metadata.getJobProfile() != null) {
                     printer.addFooterPrinter(new ProfilePrinter(metadata.getJobProfile()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index c82bac2..72557be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -586,9 +586,9 @@
             // in case of ASYNC delivery, the status is printed by query translator
             responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus()));
         }
-        final ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, execution.duration(),
-                stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount,
-                stats.getTotalWarningsCount(), stats.getDiskIoCount());
+        final ResponseMetrics metrics =
+                ResponseMetrics.of(System.nanoTime() - elapsedStart, execution.duration(), stats.getCount(),
+                        stats.getSize(), stats.getProcessedObjects(), errorCount, stats.getTotalWarningsCount());
         responsePrinter.addFooterPrinter(new MetricsPrinter(metrics, resultCharset));
         if (isPrintingProfile(stats)) {
             responsePrinter.addFooterPrinter(new ProfilePrinter(stats.getJobProfile()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index f34ee37..05073f9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -70,7 +70,6 @@
 
     private void aggregateJobStats(JobId jobId, ResultMetadata metadata) {
         long processedObjects = 0;
-        long diskIoCount = 0;
         long aggregateTotalWarningsCount = 0;
         Set<Warning> AggregateWarnings = new HashSet<>();
         IJobManager jobManager =
@@ -84,7 +83,6 @@
                 final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
                 for (TaskProfile tp : jobletTasksProfile) {
                     processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
-                    diskIoCount += tp.getStatsCollector().getAggregatedStats().getDiskIoCounter().get();
                     aggregateTotalWarningsCount += tp.getTotalWarningsCount();
                     Set<Warning> taskWarnings = tp.getWarnings();
                     if (AggregateWarnings.size() < maxWarnings && !taskWarnings.isEmpty()) {
@@ -98,7 +96,6 @@
         }
         metadata.setProcessedObjects(processedObjects);
         metadata.setWarnings(AggregateWarnings);
-        metadata.setDiskIoCount(diskIoCount);
         metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
         if (run != null && run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
             metadata.setJobProfile(run.getJobProfile().toJSON());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
index bc91d54..8dfe923 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
@@ -33,7 +33,7 @@
     }
 
     public static ResponseMetrics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
-            long processedObjects, long errorCount, long warnCount, long diskIoCount) {
+            long processedObjects, long errorCount, long warnCount) {
         ResponseMetrics metrics = new ResponseMetrics();
         metrics.elapsedTime = elapsedTime;
         metrics.executionTime = executionTime;
@@ -42,7 +42,6 @@
         metrics.processedObjects = processedObjects;
         metrics.errorCount = errorCount;
         metrics.warnCount = warnCount;
-        metrics.diskIoCount = diskIoCount;
         return metrics;
     }
 
@@ -73,8 +72,4 @@
     public long getWarnCount() {
         return warnCount;
     }
-
-    public long getDiskIoCount() {
-        return diskIoCount;
-    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
index 21ffe0b..5549683 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
@@ -36,7 +36,6 @@
         RESULT_SIZE("resultSize"),
         ERROR_COUNT("errorCount"),
         PROCESSED_OBJECTS_COUNT("processedObjects"),
-        DISK_IO_COUNT("diskIoCount"),
         WARNING_COUNT("warningCount");
 
         private final String str;
@@ -78,16 +77,9 @@
         pw.print("\n\t");
         final boolean hasErrors = metrics.getErrorCount() > 0;
         final boolean hasWarnings = metrics.getWarnCount() > 0;
-        final boolean hasDiskIoStats = metrics.getDiskIoCount() > 0;
         ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), metrics.getProcessedObjects(),
-                hasWarnings || hasErrors || hasDiskIoStats);
+                hasWarnings || hasErrors);
         pw.print("\n");
-        //TODO move diskIoCount to the profile printer when it is introduced
-        if (hasDiskIoStats) {
-            pw.print("\t");
-            ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), metrics.getDiskIoCount(), hasWarnings || hasErrors);
-            pw.print("\n");
-        }
         if (hasWarnings) {
             pw.print("\t");
             ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), metrics.getWarnCount(), hasErrors);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index aed358e..6bd876d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2559,7 +2559,6 @@
         if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
             stats.setJobProfile(resultMetadata.getJobProfile());
         }
-        stats.setDiskIoCount(resultMetadata.getDiskIoCount());
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index af82754..789cdbc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -259,6 +259,7 @@
                 ObjectMapper OM = new ObjectMapper();
                 JsonNode expectedJson = OM.readTree(readerExpected);
                 JsonNode actualJson = OM.readTree(readerActual);
+                System.out.println(OM.writeValueAsString(actualJson));
                 if (expectedJson == null || actualJson == null) {
                     throw new NullPointerException("Error parsing expected or actual result file for " + scriptFile);
                 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
index 81093a3..da7ba31 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -40,10 +40,5 @@
         <output-dir compare="Text">secondary-index-index-only</output-dir>
       </compilation-unit>
     </test-case>
-    <test-case FilePath="metrics">
-      <compilation-unit name="disk-io-count">
-        <output-dir compare="Text">disk-io-count</output-dir>
-      </compilation-unit>
-    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp
deleted file mode 100644
index 3fb3ceb..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.1.ddl.sqlpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Disk io count metrics on full scan
- * Expected Res : Success
- * Date         : 10 Sep 2019
- */
-
-drop dataverse test if exists;
-create dataverse test;
-
-use test;
-
-create type test.AddressType as
-{
-  number : bigint,
-  street : string,
-  city : string
-};
-
-create type test.CustomerType as
- closed {
-  cid : bigint,
-  name : string,
-  age : bigint?,
-  address : AddressType?,
-  lastorder : {
-      oid : bigint,
-      total : float
-  }
-};
-
-create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp
deleted file mode 100644
index 348a66e..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.2.update.sqlpp
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Disk io count metrics on full scan
- * Expected Res : Success
- * Date         : 10 Sep 2019
- */
-
-use test;
-
-load dataset Customers using localfs
-  ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
-  (`format`=`adm`));
-
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp
deleted file mode 100644
index 7561a2c..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.3.metrics.sqlpp
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Disk io count metrics on full scan
- * Expected Res : Success
- * Date         : 10 Sep 2019
- */
-
-use test;
-
-select count(*) from Customers
-where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp
deleted file mode 100644
index 01c285e..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/disk-io-count/disk-io-count.4.ddl.sqlpp
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Disk io count metrics on full scan
- * Expected Res : Success
- * Date         : 10 Sep 2019
- */
-
-drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm
deleted file mode 100644
index 7eb0b1d..0000000
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/disk-io-count/disk-io-count.3.regexadm
+++ /dev/null
@@ -1 +0,0 @@
-.*"diskIoCount":[0-9]+.*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
index 82e7128..c6f99d1b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -30,15 +30,18 @@
                     "counters": [
                         {
                             "name": "Empty Tuple Source",
-                            "time": "R{[0-9.]+}"
+                            "time": "R{[0-9.]+}",
+                            "disk-io": "R{[0-9.]+}"
                         },
                         {
                             "name": "Index Search",
-                            "time": "R{[0-9.]+}"
+                            "time": "R{[0-9.]+}",
+                            "disk-io": "R{[0-9.]+}"
                         },
                         {
                             "name": "R{.+}",
-                            "time": "R{[0-9.]+}"
+                            "time": "R{[0-9.]+}",
+                            "disk-io": "R{[0-9.]+}"
                         }
                     ]
                 },
@@ -50,11 +53,13 @@
                   "counters": [
                     {
                       "name": "R{.+}",
-                      "time": "R{[0-9.]+}"
+                      "time": "R{[0-9.]+}",
+                      "disk-io": "R{[0-9.]+}"
                     },
                     {
                       "name": "Result Writer",
-                      "time": "R{[0-9.]+}"
+                      "time": "R{[0-9.]+}",
+                      "disk-io": "R{[0-9.]+}"
                     }
                   ]
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 1e7a3b7..dab6d26 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -82,7 +82,6 @@
 
     @Override
     public ObjectNode toJSON() {
-
         ObjectMapper om = new ObjectMapper();
         ObjectNode json = om.createObjectNode();
 
@@ -131,6 +130,7 @@
             jpe.put("name", key);
             jpe.put("time", Double
                     .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
+            jpe.put("disk-io", value.getDiskIoCounter().get());
             countersObj.add(jpe);
         });
         json.set("counters", countersObj);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 8a6389a..e58e4a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -507,7 +507,9 @@
 
     @Override
     public synchronized void subscribeThreadToStats(IThreadStatsCollector threadStatsCollector) {
-        //TODO do this only when profiling is enabled
+        if (!isRuntimeProfilingEnabled()) {
+            return;
+        }
         synchronized (threadStatsCollectors) {
             threadStatsCollectors.add(threadStatsCollector);
             final long threadId = Thread.currentThread().getId();
@@ -518,6 +520,9 @@
 
     @Override
     public synchronized void unsubscribeThreadFromStats() {
+        if (!isRuntimeProfilingEnabled()) {
+            return;
+        }
         synchronized (threadStatsCollectors) {
             threadStatsCollectors.forEach(IThreadStatsCollector::unsubscribe);
         }
@@ -559,4 +564,8 @@
         return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"node\" : \"" + ncs.getId() + "\" \"jobId\" : \""
                 + joblet.getJobId() + "\", \"taskId\" : \"" + taskAttemptId + "\" }";
     }
+
+    private boolean isRuntimeProfilingEnabled() {
+        return getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+    }
 }