[ASTERIXDB-3085][IDX] Collect secondary index stats while ANALYZE DATASET

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
As part of collecting stats about a dataset using ANALYZE DATASET, collect
stats about the secondary indexes. The stats include the number of pages.

Backport changes:
- Compensate for not having the additional profiler stats added before this

Change-Id: I800ed3015832c311c7075f7cc8c5325b2fc62265
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17277
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17335
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index fa5507f..b017648 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -211,7 +211,7 @@
 import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
 import org.apache.asterix.runtime.fulltext.IFullTextFilterDescriptor;
 import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
-import org.apache.asterix.runtime.operators.StreamStats;
+import org.apache.asterix.runtime.operators.DatasetStreamStats;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.ClientRequest;
@@ -4266,9 +4266,9 @@
             int sampleCardinalityTarget = stmtAnalyze.getSampleSize();
             long sampleSeed = stmtAnalyze.getOrCreateSampleSeed();
 
-            Index.SampleIndexDetails newIndexDetailsPendingAdd =
-                    new Index.SampleIndexDetails(dsDetails.getPrimaryKey(), dsDetails.getKeySourceIndicator(),
-                            dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0, sampleSeed);
+            Index.SampleIndexDetails newIndexDetailsPendingAdd = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
+                    dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0,
+                    sampleSeed, Collections.emptyMap());
             newIndexPendingAdd = new Index(dataverseName, datasetName, newIndexName, sampleIndexType,
                     newIndexDetailsPendingAdd, false, false, MetadataUtil.PENDING_ADD_OP);
 
@@ -4306,11 +4306,11 @@
             if (opStats == null || opStats.size() == 0) {
                 throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "", sourceLoc);
             }
-            StreamStats stats = new StreamStats(opStats.get(0));
+            DatasetStreamStats stats = new DatasetStreamStats(opStats.get(0));
 
             Index.SampleIndexDetails newIndexDetailsFinal = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
                     dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget,
-                    stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed);
+                    stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed, stats.getIndexesStats());
             Index newIndexFinal = new Index(dataverseName, datasetName, newIndexName, sampleIndexType,
                     newIndexDetailsFinal, false, false, MetadataUtil.PENDING_NO_OP);
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp
new file mode 100644
index 0000000..8c374a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Test collecting secondary indexes stats with ANALYZE DATASET statement
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE testType AS OPEN {
+  id : uuid
+};
+
+CREATE DATASET ds1(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds2(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds3(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds4(testType) PRIMARY KEY id AUTOGENERATED;
+
+CREATE INDEX ds1_idx1 ON ds1(name: string);
+CREATE INDEX ds1_idx2 ON ds1(UNNEST interests: string) EXCLUDE UNKNOWN KEY;;
+CREATE PRIMARY INDEX ds1_idx3 ON ds1;
+
+CREATE INDEX ds2_idx1 ON ds2(name: string);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp
new file mode 100644
index 0000000..711b914
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+LOAD DATASET ds1 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds2 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds3 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds4 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp
new file mode 100644
index 0000000..97e07ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+ANALYZE DATASET ds1;
+ANALYZE DATASET ds2;
+ANALYZE DATASET ds3;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp
new file mode 100644
index 0000000..9c01681
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE"
+SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed
+ORDER BY t.DatasetName, t.IndexName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp
new file mode 100644
index 0000000..4a2edf0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+DROP INDEX ds1.ds1_idx3;
+ANALYZE DATASET ds1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp
new file mode 100644
index 0000000..9c01681
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE"
+SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed
+ORDER BY t.DatasetName, t.IndexName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp
new file mode 100644
index 0000000..36b2bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm
new file mode 100644
index 0000000..bc2792b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm
@@ -0,0 +1,3 @@
+{ "DatasetName": "ds1", "IndexName": "sample_idx_1_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx3", "NumPages": 8 }, { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm
new file mode 100644
index 0000000..08f8659
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm
@@ -0,0 +1,3 @@
+{ "DatasetName": "ds1", "IndexName": "sample_idx_2_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index f84472e..c87f368 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -46,9 +46,13 @@
     private StoragePathUtil() {
     }
 
+    public static IFileSplitProvider splitProvider(FileSplit[] splits) {
+        return new ConstantFileSplitProvider(splits);
+    }
+
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
             FileSplit[] splits) {
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        IFileSplitProvider splitProvider = splitProvider(splits);
         String[] loc = new String[splits.length];
         for (int p = 0; p < splits.length; p++) {
             loc[p] = splits[p].getNodeName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 712a216..53cf3d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,11 +26,13 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -1830,6 +1832,24 @@
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 
+    public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
+        List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
+                .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
+                .collect(Collectors.toList());
+        if (dsIndexes.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<String> datasetNodes = findNodes(ds.getNodeGroupName());
+        List<Pair<IFileSplitProvider, String>> indexesSplits =
+                dsIndexes.stream()
+                        .map(idx -> new Pair<>(
+                                StoragePathUtil.splitProvider(SplitsAndConstraintsUtil.getIndexSplits(
+                                        appCtx.getClusterStateManager(), ds, idx.getIndexName(), datasetNodes)),
+                                idx.getIndexName()))
+                        .collect(Collectors.toList());
+        return indexesSplits;
+    }
+
     public LockList getLocks() {
         return locks;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index 21d2aaa..bbccd65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -22,6 +22,7 @@
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -39,6 +40,7 @@
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.job.profiling.IndexStats;
 import org.apache.hyracks.util.OptionalBoolean;
 
 /**
@@ -554,10 +556,11 @@
         private final int sourceAvgItemSize;
 
         private final long sampleSeed;
+        private final Map<String, IndexStats> indexesStats;
 
         public SampleIndexDetails(List<List<String>> keyFieldNames, List<Integer> keyFieldSourceIndicators,
                 List<IAType> keyFieldTypes, int sampleCardinalityTarget, long sourceCardinality, int sourceAvgItemSize,
-                long sampleSeed) {
+                long sampleSeed, Map<String, IndexStats> indexesStats) {
             this.keyFieldNames = keyFieldNames;
             this.keyFieldSourceIndicators = keyFieldSourceIndicators;
             this.keyFieldTypes = keyFieldTypes;
@@ -565,6 +568,7 @@
             this.sourceCardinality = sourceCardinality;
             this.sourceAvgItemSize = sourceAvgItemSize;
             this.sampleSeed = sampleSeed;
+            this.indexesStats = indexesStats;
         }
 
         @Override
@@ -604,6 +608,10 @@
         public long getSampleSeed() {
             return sampleSeed;
         }
+
+        public Map<String, IndexStats> getIndexesStats() {
+            return indexesStats;
+        }
     }
 
     @Deprecated
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
index 9c742ed..fc78d10 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
@@ -25,7 +25,9 @@
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.builders.IARecordBuilder;
@@ -49,6 +51,7 @@
 import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AMutableInt64;
 import org.apache.asterix.om.base.AMutableInt8;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.base.AOrderedList;
@@ -67,6 +70,7 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IndexStats;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.util.OptionalBoolean;
@@ -95,6 +99,9 @@
     public static final String SAMPLE_CARDINALITY_TARGET = "SampleCardinalityTarget";
     public static final String SOURCE_CARDINALITY = "SourceCardinality";
     public static final String SOURCE_AVG_ITEM_SIZE = "SourceAvgItemSize";
+    public static final String INDEXES_STATS = "IndexesStats";
+    public static final String STATS_NUM_PAGES = "NumPages";
+    public static final String STATS_INDEX_NAME = "IndexName";
 
     protected final TxnId txnId;
     protected final MetadataNode metadataNode;
@@ -105,13 +112,15 @@
     protected OrderedListBuilder complexSearchKeyNameListBuilder;
     protected IARecordBuilder complexSearchKeyNameRecordBuilder;
     protected IARecordBuilder castRecordBuilder;
+    protected OrderedListBuilder indexesStatsListBuilder;
+    protected IARecordBuilder indexStatsRecordBuilder;
     protected AOrderedListType stringList;
     protected AOrderedListType int8List;
     protected ArrayBackedValueStorage nameValue;
     protected ArrayBackedValueStorage itemValue;
     protected AMutableInt8 aInt8;
+    protected AMutableInt64 aInt64;
     protected ISerializerDeserializer<AInt8> int8Serde;
-    protected ISerializerDeserializer<ANull> nullSerde;
 
     @SuppressWarnings("unchecked")
     protected IndexTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) {
@@ -124,14 +133,16 @@
             primaryKeyListBuilder = new OrderedListBuilder();
             complexSearchKeyNameRecordBuilder = new RecordBuilder();
             castRecordBuilder = new RecordBuilder();
+            indexesStatsListBuilder = new OrderedListBuilder();
+            indexStatsRecordBuilder = new RecordBuilder();
             complexSearchKeyNameListBuilder = new OrderedListBuilder();
             stringList = new AOrderedListType(BuiltinType.ASTRING, null);
             int8List = new AOrderedListType(BuiltinType.AINT8, null);
             nameValue = new ArrayBackedValueStorage();
             itemValue = new ArrayBackedValueStorage();
             aInt8 = new AMutableInt8((byte) 0);
+            aInt64 = new AMutableInt64(0);
             int8Serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-            nullSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
         }
     }
 
@@ -490,8 +501,26 @@
                 }
                 int sourceAvgItemSize = ((AInt32) indexRecord.getValueByPos(sourceAvgItemSizePos)).getIntegerValue();
 
+                int indexesStatsPos = indexRecord.getType().getFieldIndex(INDEXES_STATS);
+                Map<String, IndexStats> indexesStats;
+                if (indexesStatsPos >= 0) {
+                    AOrderedList indexesStatsList = (AOrderedList) indexRecord.getValueByPos(indexesStatsPos);
+                    int numIndexes = indexesStatsList.size();
+                    indexesStats = numIndexes > 0 ? new HashMap<>() : Collections.emptyMap();
+                    for (int i = 0; i < numIndexes; i++) {
+                        ARecord stats = (ARecord) indexesStatsList.getItem(i);
+                        IAObject numPages = stats.getValueByPos(stats.getType().getFieldIndex(STATS_NUM_PAGES));
+                        IAObject idxNameObj = stats.getValueByPos(stats.getType().getFieldIndex(STATS_INDEX_NAME));
+                        String idxName = ((AString) idxNameObj).getStringValue();
+                        IndexStats idxStats = new IndexStats(idxName, ((AInt64) numPages).getLongValue());
+                        indexesStats.put(idxName, idxStats);
+                    }
+                } else {
+                    indexesStats = Collections.emptyMap();
+                }
+
                 indexDetails = new Index.SampleIndexDetails(keyFieldNames, keyFieldSourceIndicator, keyFieldTypes,
-                        sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed);
+                        sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed, indexesStats);
                 break;
             default:
                 throw new AsterixException(ErrorCode.METADATA_ERROR, indexType.toString());
@@ -935,6 +964,41 @@
             stringSerde.serialize(aString, nameValue.getDataOutput());
             int32Serde.serialize(new AInt32(indexDetails.getSourceAvgItemSize()), fieldValue.getDataOutput());
             recordBuilder.addField(nameValue, fieldValue);
+
+            Map<String, IndexStats> indexesStats = indexDetails.getIndexesStats();
+            if (!indexesStats.isEmpty()) {
+                indexesStatsListBuilder.reset(AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE);
+                for (Map.Entry<String, IndexStats> stats : indexesStats.entrySet()) {
+                    indexStatsRecordBuilder.reset(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+                    // index name
+                    nameValue.reset();
+                    itemValue.reset();
+                    aString.setValue(STATS_INDEX_NAME);
+                    stringSerde.serialize(aString, nameValue.getDataOutput());
+                    aString.setValue(stats.getKey());
+                    stringSerde.serialize(aString, itemValue.getDataOutput());
+                    indexStatsRecordBuilder.addField(nameValue, itemValue);
+
+                    // index number of pages
+                    nameValue.reset();
+                    itemValue.reset();
+                    aString.setValue(STATS_NUM_PAGES);
+                    stringSerde.serialize(aString, nameValue.getDataOutput());
+                    aInt64.setValue(stats.getValue().getNumPages());
+                    int64Serde.serialize(aInt64, itemValue.getDataOutput());
+                    indexStatsRecordBuilder.addField(nameValue, itemValue);
+
+                    itemValue.reset();
+                    indexStatsRecordBuilder.write(itemValue.getDataOutput(), true);
+                    indexesStatsListBuilder.addItem(itemValue);
+                }
+                nameValue.reset();
+                fieldValue.reset();
+                aString.setValue(INDEXES_STATS);
+                stringSerde.serialize(aString, nameValue.getDataOutput());
+                indexesStatsListBuilder.write(fieldValue.getDataOutput(), true);
+                recordBuilder.addField(nameValue, fieldValue);
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 0d3e015..056a8c2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.metadata.utils;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -36,8 +37,8 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.runtime.aggregates.collections.FirstElementEvalFactory;
 import org.apache.asterix.runtime.evaluators.comparisons.GreaterThanDescriptor;
+import org.apache.asterix.runtime.operators.DatasetStreamStatsOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
-import org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor;
 import org.apache.asterix.runtime.runningaggregates.std.SampleSlotRunningAggregateFunctionFactory;
 import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -82,6 +83,7 @@
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
 
 /**
  * Utility class for sampling operations.
@@ -164,10 +166,10 @@
         for (int i = 0; i < nFields; i++) {
             columns[i] = i;
         }
-
+        IStorageManager storageMgr = metadataProvider.getStorageComponentProvider().getStorageManager();
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
-                metadataProvider.getStorageComponentProvider().getStorageManager(), fileSplitProvider);
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new IndexDataflowHelperFactory(storageMgr, fileSplitProvider);
 
         // job spec:
         IndexUtil.bindJobEventListener(spec, metadataProvider);
@@ -179,7 +181,16 @@
         sourceOp = targetOp;
 
         // primary index scan ----> stream stats op
-        targetOp = new StreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME);
+        List<Pair<IFileSplitProvider, String>> indexesInfo = metadataProvider.getSplitProviderOfAllIndexes(dataset);
+        IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[indexesInfo.size()];
+        String[] names = new String[indexesInfo.size()];
+        for (int i = 0; i < indexes.length; i++) {
+            Pair<IFileSplitProvider, String> indexInfo = indexesInfo.get(i);
+            indexes[i] = new IndexDataflowHelperFactory(storageMgr, indexInfo.first);
+            names[i] = indexInfo.second;
+        }
+        targetOp =
+                new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes, names);
         spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
         sourceOp = targetOp;
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
similarity index 73%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
index 268009f..8ea267a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
@@ -19,26 +19,33 @@
 
 package org.apache.asterix.runtime.operators;
 
+import java.util.Map;
+
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IndexStats;
 
 /**
- * Helper method to access stats produced by {@link org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor}
+ * Helper method to access stats produced by {@link DatasetStreamStatsOperatorDescriptor}
  */
-public final class StreamStats {
+public final class DatasetStreamStats {
 
     private final long cardinality;
 
     private final int avgTupleSize;
 
-    public StreamStats(IOperatorStats opStats) {
+    private final Map<String, IndexStats> indexesStats;
+
+    public DatasetStreamStats(IOperatorStats opStats) {
         this.cardinality = opStats.getTupleCounter().get();
         long totalTupleSize = opStats.getDiskIoCounter().get();
         this.avgTupleSize = cardinality > 0 ? (int) (totalTupleSize / cardinality) : 0;
+        this.indexesStats = opStats.getIndexesStats();
     }
 
-    static void update(IOperatorStats opStats, long tupleCount, long tupleSize) {
+    static void update(IOperatorStats opStats, long tupleCount, long tupleSize, Map<String, IndexStats> indexStats) {
         opStats.getTupleCounter().update(tupleCount);
         opStats.getDiskIoCounter().update(tupleSize);
+        opStats.updateIndexesStats(indexStats);
     }
 
     public long getCardinality() {
@@ -48,4 +55,8 @@
     public int getAvgTupleSize() {
         return avgTupleSize;
     }
+
+    public Map<String, IndexStats> getIndexesStats() {
+        return indexesStats;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
new file mode 100644
index 0000000..ba29450
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.IndexStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+/**
+ * Computes total tuple count and total tuple length for all input tuples,
+ * and emits these values as operator stats.
+ */
+public final class DatasetStreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String operatorName;
+    private final IIndexDataflowHelperFactory[] indexes;
+    private final String[] indexesNames;
+    private Map<String, IndexStats> indexStats;
+
+    public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+            String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) {
+        super(spec, 1, 1);
+        outRecDescs[0] = rDesc;
+        this.operatorName = operatorName;
+        this.indexes = indexes;
+        this.indexesNames = indexesNames;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+
+        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+            private FrameTupleAccessor fta;
+            private long totalTupleCount;
+            private long totalTupleLength;
+
+            @Override
+            public void open() throws HyracksDataException {
+                fta = new FrameTupleAccessor(outRecDescs[0]);
+                totalTupleCount = 0;
+                writer.open();
+                IStatsCollector coll = ctx.getStatsCollector();
+                if (coll != null) {
+                    coll.add(new OperatorStats(operatorName));
+                }
+                INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+                indexStats = new HashMap<>();
+                for (int i = 0; i < indexes.length; i++) {
+                    IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition);
+                    try {
+                        idxFlowHelper.open();
+                        ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
+                        long numPages = 0;
+                        synchronized (indexInstance.getOperationTracker()) {
+                            for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
+                                long componentSize = component.getComponentSize();
+                                if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
+                                    componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
+                                            .getBloomFilter().getFileReference().getFile().length();
+                                }
+                                numPages += componentSize / indexInstance.getBufferCache().getPageSize();
+                            }
+                        }
+                        indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages));
+                    } finally {
+                        idxFlowHelper.close();
+                    }
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                fta.reset(buffer);
+                computeStats();
+                FrameUtils.flushFrame(buffer, writer);
+            }
+
+            private void computeStats() {
+                int n = fta.getTupleCount();
+                totalTupleCount += n;
+                for (int i = 0; i < n; i++) {
+                    totalTupleLength += fta.getTupleLength(i);
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                IStatsCollector statsCollector = ctx.getStatsCollector();
+                if (statsCollector != null) {
+                    IOperatorStats stats = statsCollector.getOrAddOperatorStats(operatorName);
+                    DatasetStreamStats.update(stats, totalTupleCount, totalTupleLength, indexStats);
+                }
+                writer.close();
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
+
+            @Override
+            public String getDisplayName() {
+                return operatorName;
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java
deleted file mode 100644
index 353401a..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/**
- * Computes total tuple count and total tuple length for all input tuples,
- * and emits these values as operator stats.
- */
-public final class StreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
-    private static final long serialVersionUID = 1L;
-
-    private final String operatorName;
-
-    public StreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
-            String operatorName) {
-        super(spec, 1, 1);
-        outRecDescs[0] = rDesc;
-        this.operatorName = operatorName;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-
-            private FrameTupleAccessor fta;
-            private long totalTupleCount;
-            private long totalTupleLength;
-
-            @Override
-            public void open() throws HyracksDataException {
-                fta = new FrameTupleAccessor(outRecDescs[0]);
-                totalTupleCount = 0;
-                writer.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                fta.reset(buffer);
-                computeStats();
-                FrameUtils.flushFrame(buffer, writer);
-            }
-
-            private void computeStats() {
-                int n = fta.getTupleCount();
-                totalTupleCount += n;
-                for (int i = 0; i < n; i++) {
-                    totalTupleLength += fta.getTupleLength(i);
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                IStatsCollector statsCollector = ctx.getStatsCollector();
-                if (statsCollector != null) {
-                    IOperatorStats stats = statsCollector.getOrAddOperatorStats(operatorName);
-                    StreamStats.update(stats, totalTupleCount, totalTupleLength);
-                }
-                writer.close();
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                writer.flush();
-            }
-
-            @Override
-            public String getDisplayName() {
-                return operatorName;
-            }
-        };
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 181249a..0d38fac 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.job.profiling;
 
 import java.io.Serializable;
+import java.util.Map;
 
 import org.apache.hyracks.api.io.IWritable;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -46,4 +47,10 @@
      * @return A counter used to track the number of pages pinned by an opeartor
      */
     ICounter getDiskIoCounter();
+
+    void updateIndexesStats(Map<String, IndexStats> indexesStats);
+
+    Map<String, IndexStats> getIndexesStats();
+
+    void updateFrom(IOperatorStats stats);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
new file mode 100644
index 0000000..0c471ef
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
+import org.apache.hyracks.api.io.IWritable;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class IndexStats implements IWritable, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ICounter numPages;
+    private String indexName;
+
+    public IndexStats(String indexName, long numPages) {
+        this.indexName = indexName;
+        this.numPages = new Counter("numPages");
+        this.numPages.set(numPages);
+    }
+
+    public static IndexStats create(DataInput input) throws IOException {
+        String indexName = input.readUTF();
+        long numPages = input.readLong();
+        return new IndexStats(indexName, numPages);
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(indexName);
+        output.writeLong(numPages.get());
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        indexName = input.readUTF();
+        numPages.set(input.readLong());
+    }
+
+    public void updateNumPages(long delta) {
+        numPages.update(delta);
+    }
+
+    public long getNumPages() {
+        return numPages.get();
+    }
+
+    @Override
+    public String toString() {
+        return "IndexStats{indexName='" + indexName + "', numPages=" + numPages.get() + '}';
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index 08c1adc..6b7deb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -21,6 +21,8 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -32,6 +34,7 @@
     public final ICounter tupleCounter;
     public final ICounter timeCounter;
     public final ICounter diskIoCounter;
+    private final Map<String, IndexStats> indexesStats;
 
     public OperatorStats(String operatorName) {
         if (operatorName == null || operatorName.isEmpty()) {
@@ -41,6 +44,7 @@
         tupleCounter = new Counter("tupleCounter");
         timeCounter = new Counter("timeCounter");
         diskIoCounter = new Counter("diskIoCounter");
+        indexesStats = new HashMap<>();
     }
 
     public static IOperatorStats create(DataInput input) throws IOException {
@@ -71,11 +75,42 @@
     }
 
     @Override
+    public void updateIndexesStats(Map<String, IndexStats> stats) {
+        if (stats == null) {
+            return;
+        }
+        for (Map.Entry<String, IndexStats> stat : stats.entrySet()) {
+            String indexName = stat.getKey();
+            IndexStats indexStat = stat.getValue();
+            IndexStats existingIndexStat = indexesStats.get(indexName);
+            if (existingIndexStat == null) {
+                indexesStats.put(indexName, new IndexStats(indexName, indexStat.getNumPages()));
+            } else {
+                existingIndexStat.updateNumPages(indexStat.getNumPages());
+            }
+        }
+    }
+
+    @Override
+    public Map<String, IndexStats> getIndexesStats() {
+        return indexesStats;
+    }
+
+    @Override
+    public void updateFrom(IOperatorStats stats) {
+        tupleCounter.update(stats.getTupleCounter().get());
+        timeCounter.update(stats.getTimeCounter().get());
+        diskIoCounter.update(stats.getDiskIoCounter().get());
+        updateIndexesStats(stats.getIndexesStats());
+    }
+
+    @Override
     public void writeFields(DataOutput output) throws IOException {
         output.writeUTF(operatorName);
         output.writeLong(tupleCounter.get());
         output.writeLong(timeCounter.get());
         output.writeLong(diskIoCounter.get());
+        writeIndexesStats(output);
     }
 
     @Override
@@ -83,11 +118,30 @@
         tupleCounter.set(input.readLong());
         timeCounter.set(input.readLong());
         diskIoCounter.set(input.readLong());
+        readIndexesStats(input);
+    }
+
+    private void writeIndexesStats(DataOutput output) throws IOException {
+        output.writeInt(indexesStats.size());
+        for (Map.Entry<String, IndexStats> indexStat : indexesStats.entrySet()) {
+            output.writeUTF(indexStat.getKey());
+            indexStat.getValue().writeFields(output);
+        }
+    }
+
+    private void readIndexesStats(DataInput input) throws IOException {
+        int numIndexes = input.readInt();
+        for (int i = 0; i < numIndexes; i++) {
+            String indexName = input.readUTF();
+            IndexStats indexStats = IndexStats.create(input);
+            indexesStats.put(indexName, indexStats);
+        }
     }
 
     @Override
     public String toString() {
         return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
-                + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }";
+                + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \""
+                + ", \"indexesStats\": \"" + indexesStats + "\" }";
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 0026af4..ee49908 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -39,7 +39,7 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JobProfile extends AbstractProfile {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private JobId jobId;
 
@@ -149,9 +149,7 @@
                     opOutStats = new OperatorStats(operatorName);
                     outStats[i] = opOutStats;
                 }
-                opOutStats.getTupleCounter().update(opTaskStats.getTupleCounter().get());
-                opOutStats.getTimeCounter().update(opTaskStats.getTimeCounter().get());
-                opOutStats.getDiskIoCounter().update(opTaskStats.getDiskIoCounter().get());
+                opOutStats.updateFrom(opTaskStats);
             }
         }
         return Arrays.asList(outStats);