[ASTERIXDB-3439][STO] Exposing cloud read metrics

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

This patch adds following three metrics related to cloud reads
1. remoteStorageReadRequest - Gets the count of cloud request to remote storage
2. remoteStoragePageRead - The count of pages read from the remote storage
3. remoteStoragePagePersist - The count of fetched page is persisted in the disk.

Change-Id: I5d547e4267ec93c97f7b5398ce925fbb73262010
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18388
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index b624c9b..55d3276 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -140,6 +140,9 @@
         private long compileTime;
         private double bufferCacheHitRatio;
         private long bufferCachePageReadCount;
+        private long cloudReadRequestsCount;
+        private long cloudPagesReadCount;
+        private long cloudPagesPersistedCount;
 
         public long getCount() {
             return count;
@@ -222,6 +225,30 @@
         public long getBufferCachePageReadCount() {
             return bufferCachePageReadCount;
         }
+
+        public void setCloudReadRequestsCount(long cloudReadRequestsCount) {
+            this.cloudReadRequestsCount = cloudReadRequestsCount;
+        }
+
+        public long getCloudReadRequestsCount() {
+            return cloudReadRequestsCount;
+        }
+
+        public void setCloudPagesReadCount(long cloudPagesReadCount) {
+            this.cloudPagesReadCount = cloudPagesReadCount;
+        }
+
+        public long getCloudPagesReadCount() {
+            return cloudPagesReadCount;
+        }
+
+        public void setCloudPagesPersistedCount(long cloudPagesPersistedCount) {
+            this.cloudPagesPersistedCount = cloudPagesPersistedCount;
+        }
+
+        public long getCloudPagesPersistedCount() {
+            return cloudPagesPersistedCount;
+        }
     }
 
     class Profile implements Serializable {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
index af06817..5d7705b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java
@@ -40,6 +40,9 @@
     private transient List<Object> outputTypes;
     private long queueWaitTimeInNanos;
     private long bufferCachePageReadCount;
+    private long cloudReadRequestsCount;
+    private long cloudPagesReadCount;
+    private long cloudPagesPersistedCount;
 
     public ResultMetadata(SessionConfig.OutputFormat format) {
         this.format = format;
@@ -77,6 +80,30 @@
         return bufferCachePageReadCount;
     }
 
+    public void setCloudReadRequestsCount(long cloudReadRequestsCount) {
+        this.cloudReadRequestsCount = cloudReadRequestsCount;
+    }
+
+    public long getCloudReadRequestsCount() {
+        return cloudReadRequestsCount;
+    }
+
+    public void setCloudPagesReadCount(long cloudPagesReadCount) {
+        this.cloudPagesReadCount = cloudPagesReadCount;
+    }
+
+    public long getCloudPagesReadCount() {
+        return cloudPagesReadCount;
+    }
+
+    public void setCloudPagesPersistedCount(long cloudPagesPersistedCount) {
+        this.cloudPagesPersistedCount = cloudPagesPersistedCount;
+    }
+
+    public long getCloudPagesPersistedCount() {
+        return cloudPagesPersistedCount;
+    }
+
     public void setWarnings(Set<Warning> warnings) {
         this.warnings = warnings;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 65d4734..bedcd9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -209,6 +209,9 @@
         stats.setQueueWaitTime(responseStats.getQueueWaitTime());
         stats.setBufferCacheHitRatio(responseStats.getBufferCacheHitRatio());
         stats.setBufferCachePageReadCount(responseStats.getBufferCachePageReadCount());
+        stats.setCloudReadRequestsCount(responseStats.getCloudReadRequestsCount());
+        stats.setCloudPagesReadCount(responseStats.getCloudPagesReadCount());
+        stats.setCloudPagesPersistedCount(responseStats.getCloudPagesPersistedCount());
     }
 
     private static void updatePropertiesFromCC(IStatementExecutor.StatementProperties statementProperties,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index c2e8dca..77846d5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -103,7 +103,9 @@
                 ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart,
                         metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
                         metadata.getTotalWarningsCount(), stats.getCompileTime(), stats.getQueueWaitTime(),
-                        stats.getBufferCacheHitRatio(), stats.getBufferCachePageReadCount());
+                        stats.getBufferCacheHitRatio(), stats.getBufferCachePageReadCount(),
+                        stats.getCloudReadRequestsCount(), stats.getCloudPagesReadCount(),
+                        stats.getCloudPagesPersistedCount());
                 printer.addFooterPrinter(new MetricsPrinter(metrics, HttpUtil.getPreferredCharset(request)));
                 if (metadata.getJobProfile() != null) {
                     printer.addFooterPrinter(new ProfilePrinter(metadata.getJobProfile()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 4798f9e..07d9632 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -369,7 +369,8 @@
         final ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, executionState.duration(),
                 stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount,
                 stats.getTotalWarningsCount(), stats.getCompileTime(), stats.getQueueWaitTime(),
-                stats.getBufferCacheHitRatio(), stats.getBufferCachePageReadCount());
+                stats.getBufferCacheHitRatio(), stats.getBufferCachePageReadCount(), stats.getCloudReadRequestsCount(),
+                stats.getCloudPagesReadCount(), stats.getCloudPagesPersistedCount());
         responsePrinter.addFooterPrinter(new MetricsPrinter(metrics, resultCharset));
         if (isPrintingProfile(stats)) {
             responsePrinter.addFooterPrinter(new ProfilePrinter(stats.getJobProfile()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index c419d0d..b5a2afe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -74,6 +74,9 @@
         long aggregateTotalWarningsCount = 0;
         long pagesRead = 0;
         long nonPagedReads = 0;
+        long cloudReadRequestsCount = 0;
+        long cloudPagesRead = 0;
+        long cloudPagesPersisted = 0;
         Set<Warning> AggregateWarnings = new HashSet<>();
         IJobManager jobManager =
                 ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
@@ -88,6 +91,10 @@
                     processedObjects += tp.getStatsCollector().getAggregatedStats().getInputTupleCounter().get();
                     pagesRead += tp.getStatsCollector().getAggregatedStats().getPageReads().get();
                     nonPagedReads += tp.getStatsCollector().getAggregatedStats().coldReadCounter().get();
+                    cloudReadRequestsCount +=
+                            tp.getStatsCollector().getAggregatedStats().cloudReadRequestCounter().get();
+                    cloudPagesRead += tp.getStatsCollector().getAggregatedStats().cloudReadPageCounter().get();
+                    cloudPagesPersisted += tp.getStatsCollector().getAggregatedStats().cloudPersistPageCounter().get();
                     aggregateTotalWarningsCount += tp.getTotalWarningsCount();
                     Set<Warning> taskWarnings = tp.getWarnings();
                     if (AggregateWarnings.size() < maxWarnings && !taskWarnings.isEmpty()) {
@@ -103,6 +110,9 @@
         metadata.setProcessedObjects(processedObjects);
         metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
         metadata.setBufferCachePageReadCount(pagesRead);
+        metadata.setCloudReadRequestsCount(cloudReadRequestsCount);
+        metadata.setCloudPagesReadCount(cloudPagesRead);
+        metadata.setCloudPagesPersistedCount(cloudPagesPersisted);
         metadata.setWarnings(AggregateWarnings);
         metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
         if (run != null && run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
index 8db483c..766618f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
@@ -32,13 +32,17 @@
     private long queueWaitTime;
     private double bufferCacheHitRatio;
     private long bufferCachePageReadCount;
+    private long cloudReadRequestsCount;
+    private long cloudPagesReadCount;
+    private long cloudPagesPersistedCount;
 
     private ResponseMetrics() {
     }
 
     public static ResponseMetrics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
             long processedObjects, long errorCount, long warnCount, long compileTime, long queueWaitTime,
-            double bufferCacheHitRatio, long bufferCachePageReadCount) {
+            double bufferCacheHitRatio, long bufferCachePageReadCount, long cloudRequestsCount,
+            long cloudPagesReadCount, long cloudPagesPersistedCount) {
         ResponseMetrics metrics = new ResponseMetrics();
         metrics.elapsedTime = elapsedTime;
         metrics.executionTime = executionTime;
@@ -51,6 +55,9 @@
         metrics.queueWaitTime = queueWaitTime;
         metrics.bufferCacheHitRatio = bufferCacheHitRatio;
         metrics.bufferCachePageReadCount = bufferCachePageReadCount;
+        metrics.cloudReadRequestsCount = cloudRequestsCount;
+        metrics.cloudPagesReadCount = cloudPagesReadCount;
+        metrics.cloudPagesPersistedCount = cloudPagesPersistedCount;
         return metrics;
     }
 
@@ -97,4 +104,16 @@
     public long getBufferCachePageReadCount() {
         return bufferCachePageReadCount;
     }
+
+    public long getCloudReadRequestsCount() {
+        return cloudReadRequestsCount;
+    }
+
+    public long getCloudPagesReadCount() {
+        return cloudPagesReadCount;
+    }
+
+    public long getCloudPagesPersistedCount() {
+        return cloudPagesPersistedCount;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
index 76ad2a4..a8a3135 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
@@ -40,7 +40,10 @@
         PROCESSED_OBJECTS_COUNT("processedObjects"),
         WARNING_COUNT("warningCount"),
         BUFFERCACHE_HIT_RATIO("bufferCacheHitRatio"),
-        BUFFERCACHE_PAGEREAD_COUNT("bufferCachePageReadCount");
+        BUFFERCACHE_PAGEREAD_COUNT("bufferCachePageReadCount"),
+        REMOTE_STORAGE_REQUESTS_COUNT("remoteStorageRequestsCount"),
+        REMOTE_STORAGE_PAGES_READ_COUNT("remoteStoragePagesReadCount"),
+        REMOTE_PAGES_PERSISTED_COUNT("remoteStoragePagesPersistedCount");
 
         private final String str;
 
@@ -87,6 +90,7 @@
         final boolean hasErrors = metrics.getErrorCount() > 0;
         final boolean hasWarnings = metrics.getWarnCount() > 0;
         final boolean usedCache = !(Double.isNaN(metrics.getBufferCacheHitRatio()));
+        final boolean madeCloudReadRequests = metrics.getCloudReadRequestsCount() > 0;
         ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), metrics.getProcessedObjects(),
                 usedCache || hasWarnings || hasErrors);
         pw.print("\n");
@@ -100,6 +104,20 @@
                     hasWarnings || hasErrors);
             pw.print("\n");
         }
+        if (madeCloudReadRequests) {
+            pw.print("\t");
+            ResultUtil.printField(pw, Metrics.REMOTE_STORAGE_REQUESTS_COUNT.str(), metrics.getCloudReadRequestsCount(),
+                    true);
+            pw.print("\n");
+            pw.print("\t");
+            ResultUtil.printField(pw, Metrics.REMOTE_STORAGE_PAGES_READ_COUNT.str(), metrics.getCloudPagesReadCount(),
+                    true);
+            pw.print("\n");
+            pw.print("\t");
+            ResultUtil.printField(pw, Metrics.REMOTE_PAGES_PERSISTED_COUNT.str(), metrics.getCloudPagesPersistedCount(),
+                    true);
+            pw.print("\n");
+        }
         if (hasWarnings) {
             pw.print("\t");
             ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), metrics.getWarnCount(), hasErrors);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 340cd57..f60a99f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -5334,6 +5334,9 @@
         stats.setQueueWaitTime(resultMetadata.getQueueWaitTimeInNanos());
         stats.setBufferCacheHitRatio(resultMetadata.getBufferCacheHitRatio());
         stats.setBufferCachePageReadCount(resultMetadata.getBufferCachePageReadCount());
+        stats.setCloudReadRequestsCount(resultMetadata.getCloudReadRequestsCount());
+        stats.setCloudPagesReadCount(resultMetadata.getCloudPagesReadCount());
+        stats.setCloudPagesPersistedCount(resultMetadata.getCloudPagesPersistedCount());
         if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
             stats.setJobProfile(resultMetadata.getJobProfile());
             apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 7770c4f..2dd7b41 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -57,6 +57,21 @@
     ICounter coldReadCounter();
 
     /**
+     * @return A counter used to track the number of request to fetch pages from object store
+     */
+    ICounter cloudReadRequestCounter();
+
+    /**
+     * @return A counter used to track the number of pages read from the cloud.
+     */
+    ICounter cloudReadPageCounter();
+
+    /**
+     * @return A counter used to track the number of pages, fetched form cloud gets persisted to disk
+     */
+    ICounter cloudPersistPageCounter();
+
+    /**
      * @return A counter used to set the average tuple size outputted by an operator
      */
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
index d427d14..8219d2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/NoOpOperatorStats.java
@@ -95,6 +95,21 @@
     }
 
     @Override
+    public ICounter cloudReadRequestCounter() {
+        return NOOP_COUNTER;
+    }
+
+    @Override
+    public ICounter cloudReadPageCounter() {
+        return NOOP_COUNTER;
+    }
+
+    @Override
+    public ICounter cloudPersistPageCounter() {
+        return NOOP_COUNTER;
+    }
+
+    @Override
     public ICounter getAverageTupleSz() {
         return NOOP_COUNTER;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index 412b788..9dbac3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -36,6 +36,9 @@
     public final ICounter timeCounter;
     public final ICounter pageReads;
     public final ICounter coldReadCounter;
+    public final ICounter cloudReadRequestCounter;
+    public final ICounter cloudReadPageCounter;
+    public final ICounter cloudPersistPageCounter;
     public final ICounter avgTupleSz;
     public final ICounter minTupleSz;
     public final ICounter maxTupleSz;
@@ -57,6 +60,9 @@
         timeCounter = new Counter("timeCounter");
         pageReads = new Counter("diskIoCounter");
         coldReadCounter = new Counter("coldReadCounter");
+        cloudReadRequestCounter = new Counter("cloudReadRequestCounter");
+        cloudReadPageCounter = new Counter("cloudReadPageCounter");
+        cloudPersistPageCounter = new Counter("cloudPersistPageCounter");
         avgTupleSz = new Counter("avgTupleSz");
         minTupleSz = new Counter("minTupleSz");
         maxTupleSz = new Counter("maxTupleSz");
@@ -94,6 +100,21 @@
     }
 
     @Override
+    public ICounter cloudReadRequestCounter() {
+        return cloudReadRequestCounter;
+    }
+
+    @Override
+    public ICounter cloudReadPageCounter() {
+        return cloudReadPageCounter;
+    }
+
+    @Override
+    public ICounter cloudPersistPageCounter() {
+        return cloudPersistPageCounter;
+    }
+
+    @Override
     public ICounter getAverageTupleSz() {
         return avgTupleSz;
     }
@@ -171,6 +192,9 @@
         output.writeLong(timeCounter.get());
         output.writeLong(pageReads.get());
         output.writeLong(coldReadCounter.get());
+        output.writeLong(cloudReadRequestCounter.get());
+        output.writeLong(cloudReadPageCounter.get());
+        output.writeLong(cloudPersistPageCounter.get());
         output.writeLong(avgTupleSz.get());
         output.writeLong(minTupleSz.get());
         output.writeLong(maxTupleSz.get());
@@ -187,6 +211,9 @@
         timeCounter.set(input.readLong());
         pageReads.set(input.readLong());
         coldReadCounter.set(input.readLong());
+        cloudReadRequestCounter.set(input.readLong());
+        cloudReadPageCounter.set(input.readLong());
+        cloudPersistPageCounter.set(input.readLong());
         avgTupleSz.set(input.readLong());
         minTupleSz.set(input.readLong());
         maxTupleSz.set(input.readLong());
@@ -218,7 +245,10 @@
     public String toString() {
         return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"id\": \"" + operatorId + "\", " + "\""
                 + tupleCounter.getName() + "\": " + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": "
-                + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get()
+                + timeCounter.get() + ", \"" + coldReadCounter.getName() + "\": " + coldReadCounter.get() + ", \""
+                + cloudReadRequestCounter.getName() + "\": " + cloudReadRequestCounter.get() + ", \""
+                + cloudReadPageCounter.getName() + "\": " + cloudReadPageCounter.get() + ", \""
+                + cloudPersistPageCounter.getName() + "\": " + cloudPersistPageCounter.get() + ", \""
                 + avgTupleSz.getName() + "\": " + avgTupleSz.get() + ", \"" + minTupleSz.getName() + "\": "
                 + minTupleSz.get() + ", \"" + minTupleSz.getName() + "\": " + timeCounter.get() + ", \""
                 + inputTupleCounter.getName() + "\": " + bytesRead.get() + ", \"" + bytesRead.getName() + "\": "
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
index 9c24ce1..dab23d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudReadContext.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 /**
@@ -73,13 +74,15 @@
 
     @Override
     public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException {
-        return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException {
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, threadStats, drive.hasSpace());
     }
 
     public static ByteBuffer readAndPersistPage(IOManager ioManager, BufferedFileHandle fileHandle,
-            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
-        ByteBuffer headerBuf = readAndPersistIfEmpty(ioManager, fileHandle.getFileHandle(), header, cPage, persist);
+            BufferCacheHeaderHelper header, CachedPage cPage, IThreadStats threadStats, boolean persist)
+            throws HyracksDataException {
+        ByteBuffer headerBuf =
+                readAndPersistIfEmpty(ioManager, fileHandle.getFileHandle(), header, cPage, threadStats, persist);
 
         cPage.setFrameSizeMultiplier(headerBuf.getInt(FRAME_MULTIPLIER_OFF));
         cPage.setExtraBlockPageId(headerBuf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
@@ -98,7 +101,8 @@
      * @return header buffer
      */
     private static ByteBuffer readAndPersistIfEmpty(IOManager ioManager, IFileHandle fileHandle,
-            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) throws HyracksDataException {
+            BufferCacheHeaderHelper header, CachedPage cPage, IThreadStats threadStats, boolean persist)
+            throws HyracksDataException {
         ByteBuffer headerBuf = header.getBuffer();
         if (BufferCacheCloudReadContextUtil.isEmpty(header)) {
             // header indicates the page is empty
@@ -108,10 +112,13 @@
             ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
             // Read pageZero from the cloud
             cloudIOManager.cloudRead(fileHandle, offset, headerBuf);
+            // accounting pageZero for cloud read
+            threadStats.cloudPageRead();
             headerBuf.flip();
 
             if (persist) {
                 BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle, headerBuf, offset);
+                threadStats.cloudPagePersist();
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 7cfbda0..a5eca8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -69,6 +69,9 @@
             aggregatedStats.getTimeCounter().update(stats.getTimeCounter().get());
             aggregatedStats.getPageReads().update(stats.getPageReads().get());
             aggregatedStats.coldReadCounter().update(stats.coldReadCounter().get());
+            aggregatedStats.cloudReadRequestCounter().update(stats.cloudReadRequestCounter().get());
+            aggregatedStats.cloudReadPageCounter().update(stats.cloudReadPageCounter().get());
+            aggregatedStats.cloudPersistPageCounter().update(stats.cloudPersistPageCounter().get());
         }
         return aggregatedStats;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 63ff2ed..91b87c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -344,6 +344,9 @@
                 }
                 stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
                 stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
+                stats.cloudReadRequestCounter().update(ctx.getThreadStats().getCloudReadRequestCount());
+                stats.cloudReadPageCounter().update(ctx.getThreadStats().getCloudPageReadCount());
+                stats.cloudPersistPageCounter().update(ctx.getThreadStats().getCloudPagePersistCount());
             } catch (Throwable th) { // NOSONAR Must ensure writer.fail is called.
                 // subsequently, the failure will be thrown
                 failure = th;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index 1ecc509..4cb717a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 
 @NotThreadSafe
@@ -105,9 +106,9 @@
 
     @Override
     public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException {
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException {
         // Page zero will be persisted (always) if free space permits
-        return readAndPersistPage(ioManager, fileHandle, header, cPage, drive.hasSpace());
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, threadStats, drive.hasSpace());
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 29c9467..7ad0029 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -110,7 +111,7 @@
 
     @Override
     public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException {
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException {
         boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         boolean cloudOnly = columnRanges.isCloudOnly(pageId);
@@ -129,7 +130,7 @@
              * 'cloudOnly' is true.
              */
             boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.isUnpressured();
-            readFromStream(ioManager, fileHandle, header, cPage, persist);
+            readFromStream(ioManager, fileHandle, header, cPage, persist, threadStats);
         } else {
             /*
              *  Here we can find a page that is planned for eviction, but it has not being evicted yet
@@ -140,7 +141,7 @@
         }
 
         // Finally process the header
-        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage, threadStats);
     }
 
     void unpinAll(IBufferCache bufferCache) throws HyracksDataException {
@@ -158,8 +159,8 @@
     }
 
     private void readFromStream(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage, boolean persist) throws HyracksDataException {
-        CloudInputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+            CachedPage cPage, boolean persist, IThreadStats threadStats) throws HyracksDataException {
+        CloudInputStream stream = getOrCreateStream(ioManager, fileHandle, cPage, threadStats);
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);
 
@@ -171,7 +172,7 @@
         gapStream.skipTo(cPage.getCompressedPageOffset());
 
         // Get the page's data from the cloud
-        doStreamRead(stream, buffer);
+        doStreamRead(stream, buffer, threadStats);
 
         // Flip the buffer after reading to restore the correct position
         buffer.flip();
@@ -180,11 +181,12 @@
             long offset = cPage.getCompressedPageOffset();
             ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
             BufferCacheCloudReadContextUtil.persist(cloudIOManager, fileHandle.getFileHandle(), buffer, offset);
+            threadStats.cloudPagePersist();
         }
     }
 
-    private CloudInputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage)
-            throws HyracksDataException {
+    private CloudInputStream getOrCreateStream(IOManager ioManager, BufferedFileHandle fileHandle, CachedPage cPage,
+            IThreadStats threadStats) throws HyracksDataException {
         if (gapStream != null) {
             return gapStream;
         }
@@ -196,6 +198,8 @@
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
         gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), offset, length);
 
+        threadStats.cloudReadRequest();
+
         LOGGER.info(
                 "Cloud stream read for pageId={} starting from pageCounter={} out of "
                         + "numberOfContiguousPages={}. pageZeroId={} stream: {}",
@@ -204,10 +208,12 @@
         return gapStream;
     }
 
-    private void doStreamRead(CloudInputStream stream, ByteBuffer buffer) throws HyracksDataException {
+    private void doStreamRead(CloudInputStream stream, ByteBuffer buffer, IThreadStats threadStats)
+            throws HyracksDataException {
         int length = buffer.remaining();
         try {
             stream.read(buffer);
+            threadStats.cloudPageRead();
         } catch (Throwable th) {
             LOGGER.warn("Failed to READ {} bytes from stream {}", length, gapStream);
             throw HyracksDataException.create(th);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
index 0e6f858..11275a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/DefaultColumnReadContext.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 
 public final class DefaultColumnReadContext implements IColumnReadContext {
     public static final IColumnReadContext INSTANCE = new DefaultColumnReadContext();
@@ -60,8 +61,8 @@
 
     @Override
     public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException {
-        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage);
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException {
+        return DEFAULT.processHeader(ioManager, fileHandle, header, cPage, threadStats);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
index 7b51d55..1317adc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
@@ -61,8 +62,8 @@
 
     @Override
     public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException {
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException {
         // Will not persist as the disk is pressured
-        return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, threadStats, false);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index a0ad045..c211573 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 
 /**
@@ -70,7 +71,8 @@
      * @param cPage   CachedPage in {@link BufferCache}
      * @param context read context
      */
-    public abstract void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException;
+    public abstract void read(CachedPage cPage, IBufferCacheReadContext context, IThreadStats threadStats)
+            throws HyracksDataException;
 
     /**
      * Write the CachedPage into disk
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index d2a6fc0..c37bf98 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -52,6 +52,7 @@
 import org.apache.hyracks.storage.common.file.IFileMapManager;
 import org.apache.hyracks.util.IThreadStats;
 import org.apache.hyracks.util.IThreadStatsCollector;
+import org.apache.hyracks.util.NoOpThreadStats;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -184,8 +185,9 @@
         if (DEBUG) {
             pinSanityCheck(dpid);
         }
-        final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
-        if (threadStats != null && context.incrementStats()) {
+        final IThreadStats threadStats =
+                statsSubscribers.getOrDefault(Thread.currentThread(), NoOpThreadStats.INSTANCE);
+        if (context.incrementStats()) {
             threadStats.pagePinned();
         }
         CachedPage cPage = findPage(dpid);
@@ -567,15 +569,16 @@
     private void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileHandle(cPage);
         cPage.buffer.clear();
+        final IThreadStats threadStats =
+                statsSubscribers.getOrDefault(Thread.currentThread(), NoOpThreadStats.INSTANCE);
         try {
-            fInfo.read(cPage, context);
+            fInfo.read(cPage, context, threadStats);
         } catch (Throwable e) {
             LOGGER.error("Error while reading a page {} in file {}", cPage, fInfo);
             throw e;
         }
 
-        final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
-        if (threadStats != null && context.incrementStats()) {
+        if (context.incrementStats()) {
             threadStats.coldRead();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/IBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/IBufferCacheReadContext.java
index 3c2bbcd..c2f4921 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/IBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/IBufferCacheReadContext.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 
 /**
  * Provide a context to {@link IBufferCache} pin/unpin operations as well as processing the header of the first
@@ -68,5 +69,5 @@
      * @return the byte buffer of the header after processing it
      */
     ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle, BufferCacheHeaderHelper header,
-            CachedPage cPage) throws HyracksDataException;
+            CachedPage cPage, IThreadStats threadStats) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/read/AbstractBufferCacheReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/read/AbstractBufferCacheReadContext.java
index 42abd2e..927b437 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/read/AbstractBufferCacheReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/context/read/AbstractBufferCacheReadContext.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.IThreadStats;
 
 abstract class AbstractBufferCacheReadContext implements IBufferCacheReadContext {
     @Override
@@ -44,7 +45,7 @@
 
     @Override
     public final ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle fileHandle,
-            BufferCacheHeaderHelper header, CachedPage cPage) {
+            BufferCacheHeaderHelper header, CachedPage cPage, IThreadStats threadStats) {
         ByteBuffer buf = header.getBuffer();
         cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF));
         cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index ff2bd83..9704cdb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
 import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter;
+import org.apache.hyracks.util.IThreadStats;
 
 public class BufferedFileHandle extends AbstractBufferedFileIOManager {
     private final int fileId;
@@ -71,7 +72,8 @@
     }
 
     @Override
-    public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
+    public void read(CachedPage cPage, IBufferCacheReadContext context, IThreadStats threadStats)
+            throws HyracksDataException {
         final BufferCacheHeaderHelper header = checkoutHeaderHelper();
         try {
             setPageInfo(cPage);
@@ -83,7 +85,7 @@
                 return;
             }
 
-            final ByteBuffer buf = context.processHeader(ioManager, this, header, cPage);
+            final ByteBuffer buf = context.processHeader(ioManager, this, header, cPage, threadStats);
             cPage.getBuffer().put(buf);
         } finally {
             returnHeaderHelper(header);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index cd882b5..6ad4d27 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.storage.common.compression.file.CompressedFileManager;
 import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
 import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
+import org.apache.hyracks.util.IThreadStats;
 
 public class CompressedBufferedFileHandle extends BufferedFileHandle {
     private final FileReference lafFileRef;
@@ -48,7 +49,8 @@
     }
 
     @Override
-    public void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
+    public void read(CachedPage cPage, IBufferCacheReadContext context, IThreadStats threadStats)
+            throws HyracksDataException {
         final BufferCacheHeaderHelper header = checkoutHeaderHelper();
         try {
             compressedFileManager.setCompressedPageInfo(cPage);
@@ -59,7 +61,7 @@
                 return;
             }
 
-            final ByteBuffer cBuffer = context.processHeader(ioManager, this, header, cPage);
+            final ByteBuffer cBuffer = context.processHeader(ioManager, this, header, cPage, threadStats);
             final ByteBuffer uBuffer = cPage.getBuffer();
             fixBufferPointers(uBuffer, 0);
             if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java
index e782a69..8cab823 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IThreadStats.java
@@ -43,4 +43,37 @@
      * @return the cold read count
      */
     long getColdReadCount();
+
+    /**
+     * Indicates that this thread made a cloud request to object storage
+     */
+    void cloudReadRequest();
+
+    /**
+     * Gets the count of cloud request to object storage
+     *
+     * @return the cloud request count
+     */
+    long getCloudReadRequestCount();
+
+    /**
+     * Indicates a page is read from the cloud
+     */
+    void cloudPageRead();
+
+    /**
+     * @return the count of pages read from the cloud
+     */
+    long getCloudPageReadCount();
+
+    /**
+     * Indicates the page is persistent in the disk,
+     * after fetching from cloud.
+     */
+    void cloudPagePersist();
+
+    /**
+     * @return the count of fetched page is persisted in the disk.
+     */
+    long getCloudPagePersistCount();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NoOpThreadStats.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NoOpThreadStats.java
new file mode 100644
index 0000000..c8050ca
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NoOpThreadStats.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+public class NoOpThreadStats implements IThreadStats {
+    public static final IThreadStats INSTANCE = new NoOpThreadStats();
+
+    private NoOpThreadStats() {
+
+    }
+
+    @Override
+    public void pagePinned() {
+        // do nothing
+    }
+
+    @Override
+    public long getPinnedPagesCount() {
+        return 0;
+    }
+
+    @Override
+    public void coldRead() {
+
+    }
+
+    @Override
+    public long getColdReadCount() {
+        return 0;
+    }
+
+    @Override
+    public void cloudReadRequest() {
+        // do nothing
+    }
+
+    @Override
+    public long getCloudReadRequestCount() {
+        return 0;
+    }
+
+    @Override
+    public void cloudPageRead() {
+        // do nothing
+    }
+
+    @Override
+    public long getCloudPageReadCount() {
+        return 0;
+    }
+
+    @Override
+    public void cloudPagePersist() {
+        // do nothing
+    }
+
+    @Override
+    public long getCloudPagePersistCount() {
+        return 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java
index 9af3fb1..b5c426e 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadStats.java
@@ -25,8 +25,11 @@
 @ThreadSafe
 public class ThreadStats implements IThreadStats {
 
-    private AtomicLong pinnedPagesCount = new AtomicLong();
-    private AtomicLong coldReadCount = new AtomicLong();
+    private final AtomicLong pinnedPagesCount = new AtomicLong();
+    private final AtomicLong coldReadCount = new AtomicLong();
+    private final AtomicLong cloudReadRequestCount = new AtomicLong();
+    private final AtomicLong cloudReadPageCount = new AtomicLong();
+    private final AtomicLong cloudPersistPageCount = new AtomicLong();
 
     @Override
     public void pagePinned() {
@@ -47,4 +50,34 @@
     public void coldRead() {
         coldReadCount.incrementAndGet();
     }
+
+    @Override
+    public void cloudReadRequest() {
+        cloudReadRequestCount.incrementAndGet();
+    }
+
+    @Override
+    public long getCloudReadRequestCount() {
+        return cloudReadRequestCount.get();
+    }
+
+    @Override
+    public void cloudPageRead() {
+        cloudReadPageCount.incrementAndGet();
+    }
+
+    @Override
+    public long getCloudPageReadCount() {
+        return cloudReadPageCount.get();
+    }
+
+    @Override
+    public void cloudPagePersist() {
+        cloudPersistPageCount.incrementAndGet();
+    }
+
+    @Override
+    public long getCloudPagePersistCount() {
+        return cloudPersistPageCount.get();
+    }
 }