[ASTERIXDB-3085][IDX] Collect secondary index stats while ANALYZE DATASET
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
As part of collecting stats about a dataset using ANALYZE DATASET, collect
stats about the secondary indexes. The stats include the number of pages.
Backport changes:
- Compensate for not having the additional profiler stats added before this
Change-Id: I800ed3015832c311c7075f7cc8c5325b2fc62265
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17277
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17335
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index fa5507f..b017648 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -211,7 +211,7 @@
import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
import org.apache.asterix.runtime.fulltext.IFullTextFilterDescriptor;
import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
-import org.apache.asterix.runtime.operators.StreamStats;
+import org.apache.asterix.runtime.operators.DatasetStreamStats;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import org.apache.asterix.translator.ClientRequest;
@@ -4266,9 +4266,9 @@
int sampleCardinalityTarget = stmtAnalyze.getSampleSize();
long sampleSeed = stmtAnalyze.getOrCreateSampleSeed();
- Index.SampleIndexDetails newIndexDetailsPendingAdd =
- new Index.SampleIndexDetails(dsDetails.getPrimaryKey(), dsDetails.getKeySourceIndicator(),
- dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0, sampleSeed);
+ Index.SampleIndexDetails newIndexDetailsPendingAdd = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
+ dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0,
+ sampleSeed, Collections.emptyMap());
newIndexPendingAdd = new Index(dataverseName, datasetName, newIndexName, sampleIndexType,
newIndexDetailsPendingAdd, false, false, MetadataUtil.PENDING_ADD_OP);
@@ -4306,11 +4306,11 @@
if (opStats == null || opStats.size() == 0) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "", sourceLoc);
}
- StreamStats stats = new StreamStats(opStats.get(0));
+ DatasetStreamStats stats = new DatasetStreamStats(opStats.get(0));
Index.SampleIndexDetails newIndexDetailsFinal = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(),
dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget,
- stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed);
+ stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed, stats.getIndexesStats());
Index newIndexFinal = new Index(dataverseName, datasetName, newIndexName, sampleIndexType,
newIndexDetailsFinal, false, false, MetadataUtil.PENDING_NO_OP);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp
new file mode 100644
index 0000000..8c374a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Test collecting secondary indexes stats with ANALYZE DATASET statement
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE TYPE testType AS OPEN {
+ id : uuid
+};
+
+CREATE DATASET ds1(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds2(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds3(testType) PRIMARY KEY id AUTOGENERATED;
+CREATE DATASET ds4(testType) PRIMARY KEY id AUTOGENERATED;
+
+CREATE INDEX ds1_idx1 ON ds1(name: string);
+CREATE INDEX ds1_idx2 ON ds1(UNNEST interests: string) EXCLUDE UNKNOWN KEY;;
+CREATE PRIMARY INDEX ds1_idx3 ON ds1;
+
+CREATE INDEX ds2_idx1 ON ds2(name: string);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp
new file mode 100644
index 0000000..711b914
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+LOAD DATASET ds1 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds2 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds3 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
+LOAD DATASET ds4 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp
new file mode 100644
index 0000000..97e07ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+ANALYZE DATASET ds1;
+ANALYZE DATASET ds2;
+ANALYZE DATASET ds3;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp
new file mode 100644
index 0000000..9c01681
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE"
+SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed
+ORDER BY t.DatasetName, t.IndexName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp
new file mode 100644
index 0000000..4a2edf0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+DROP INDEX ds1.ds1_idx3;
+ANALYZE DATASET ds1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp
new file mode 100644
index 0000000..9c01681
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE"
+SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed
+ORDER BY t.DatasetName, t.IndexName;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp
new file mode 100644
index 0000000..36b2bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm
new file mode 100644
index 0000000..bc2792b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm
@@ -0,0 +1,3 @@
+{ "DatasetName": "ds1", "IndexName": "sample_idx_1_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx3", "NumPages": 8 }, { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm
new file mode 100644
index 0000000..08f8659
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm
@@ -0,0 +1,3 @@
+{ "DatasetName": "ds1", "IndexName": "sample_idx_2_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] }
+{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index f84472e..c87f368 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -46,9 +46,13 @@
private StoragePathUtil() {
}
+ public static IFileSplitProvider splitProvider(FileSplit[] splits) {
+ return new ConstantFileSplitProvider(splits);
+ }
+
public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
FileSplit[] splits) {
- IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+ IFileSplitProvider splitProvider = splitProvider(splits);
String[] loc = new String[splits.length];
for (int p = 0; p < splits.length; p++) {
loc[p] = splits[p].getNodeName();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 712a216..53cf3d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,11 +26,13 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -1830,6 +1832,24 @@
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}
+ public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
+ List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
+ .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
+ .collect(Collectors.toList());
+ if (dsIndexes.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<String> datasetNodes = findNodes(ds.getNodeGroupName());
+ List<Pair<IFileSplitProvider, String>> indexesSplits =
+ dsIndexes.stream()
+ .map(idx -> new Pair<>(
+ StoragePathUtil.splitProvider(SplitsAndConstraintsUtil.getIndexSplits(
+ appCtx.getClusterStateManager(), ds, idx.getIndexName(), datasetNodes)),
+ idx.getIndexName()))
+ .collect(Collectors.toList());
+ return indexesSplits;
+ }
+
public LockList getLocks() {
return locks;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index 21d2aaa..bbccd65 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -39,6 +40,7 @@
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.job.profiling.IndexStats;
import org.apache.hyracks.util.OptionalBoolean;
/**
@@ -554,10 +556,11 @@
private final int sourceAvgItemSize;
private final long sampleSeed;
+ private final Map<String, IndexStats> indexesStats;
public SampleIndexDetails(List<List<String>> keyFieldNames, List<Integer> keyFieldSourceIndicators,
List<IAType> keyFieldTypes, int sampleCardinalityTarget, long sourceCardinality, int sourceAvgItemSize,
- long sampleSeed) {
+ long sampleSeed, Map<String, IndexStats> indexesStats) {
this.keyFieldNames = keyFieldNames;
this.keyFieldSourceIndicators = keyFieldSourceIndicators;
this.keyFieldTypes = keyFieldTypes;
@@ -565,6 +568,7 @@
this.sourceCardinality = sourceCardinality;
this.sourceAvgItemSize = sourceAvgItemSize;
this.sampleSeed = sampleSeed;
+ this.indexesStats = indexesStats;
}
@Override
@@ -604,6 +608,10 @@
public long getSampleSeed() {
return sampleSeed;
}
+
+ public Map<String, IndexStats> getIndexesStats() {
+ return indexesStats;
+ }
}
@Deprecated
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
index 9c742ed..fc78d10 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
@@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.asterix.builders.IARecordBuilder;
@@ -49,6 +51,7 @@
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AMutableInt64;
import org.apache.asterix.om.base.AMutableInt8;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.AOrderedList;
@@ -67,6 +70,7 @@
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IndexStats;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.util.OptionalBoolean;
@@ -95,6 +99,9 @@
public static final String SAMPLE_CARDINALITY_TARGET = "SampleCardinalityTarget";
public static final String SOURCE_CARDINALITY = "SourceCardinality";
public static final String SOURCE_AVG_ITEM_SIZE = "SourceAvgItemSize";
+ public static final String INDEXES_STATS = "IndexesStats";
+ public static final String STATS_NUM_PAGES = "NumPages";
+ public static final String STATS_INDEX_NAME = "IndexName";
protected final TxnId txnId;
protected final MetadataNode metadataNode;
@@ -105,13 +112,15 @@
protected OrderedListBuilder complexSearchKeyNameListBuilder;
protected IARecordBuilder complexSearchKeyNameRecordBuilder;
protected IARecordBuilder castRecordBuilder;
+ protected OrderedListBuilder indexesStatsListBuilder;
+ protected IARecordBuilder indexStatsRecordBuilder;
protected AOrderedListType stringList;
protected AOrderedListType int8List;
protected ArrayBackedValueStorage nameValue;
protected ArrayBackedValueStorage itemValue;
protected AMutableInt8 aInt8;
+ protected AMutableInt64 aInt64;
protected ISerializerDeserializer<AInt8> int8Serde;
- protected ISerializerDeserializer<ANull> nullSerde;
@SuppressWarnings("unchecked")
protected IndexTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) {
@@ -124,14 +133,16 @@
primaryKeyListBuilder = new OrderedListBuilder();
complexSearchKeyNameRecordBuilder = new RecordBuilder();
castRecordBuilder = new RecordBuilder();
+ indexesStatsListBuilder = new OrderedListBuilder();
+ indexStatsRecordBuilder = new RecordBuilder();
complexSearchKeyNameListBuilder = new OrderedListBuilder();
stringList = new AOrderedListType(BuiltinType.ASTRING, null);
int8List = new AOrderedListType(BuiltinType.AINT8, null);
nameValue = new ArrayBackedValueStorage();
itemValue = new ArrayBackedValueStorage();
aInt8 = new AMutableInt8((byte) 0);
+ aInt64 = new AMutableInt64(0);
int8Serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- nullSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
}
}
@@ -490,8 +501,26 @@
}
int sourceAvgItemSize = ((AInt32) indexRecord.getValueByPos(sourceAvgItemSizePos)).getIntegerValue();
+ int indexesStatsPos = indexRecord.getType().getFieldIndex(INDEXES_STATS);
+ Map<String, IndexStats> indexesStats;
+ if (indexesStatsPos >= 0) {
+ AOrderedList indexesStatsList = (AOrderedList) indexRecord.getValueByPos(indexesStatsPos);
+ int numIndexes = indexesStatsList.size();
+ indexesStats = numIndexes > 0 ? new HashMap<>() : Collections.emptyMap();
+ for (int i = 0; i < numIndexes; i++) {
+ ARecord stats = (ARecord) indexesStatsList.getItem(i);
+ IAObject numPages = stats.getValueByPos(stats.getType().getFieldIndex(STATS_NUM_PAGES));
+ IAObject idxNameObj = stats.getValueByPos(stats.getType().getFieldIndex(STATS_INDEX_NAME));
+ String idxName = ((AString) idxNameObj).getStringValue();
+ IndexStats idxStats = new IndexStats(idxName, ((AInt64) numPages).getLongValue());
+ indexesStats.put(idxName, idxStats);
+ }
+ } else {
+ indexesStats = Collections.emptyMap();
+ }
+
indexDetails = new Index.SampleIndexDetails(keyFieldNames, keyFieldSourceIndicator, keyFieldTypes,
- sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed);
+ sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed, indexesStats);
break;
default:
throw new AsterixException(ErrorCode.METADATA_ERROR, indexType.toString());
@@ -935,6 +964,41 @@
stringSerde.serialize(aString, nameValue.getDataOutput());
int32Serde.serialize(new AInt32(indexDetails.getSourceAvgItemSize()), fieldValue.getDataOutput());
recordBuilder.addField(nameValue, fieldValue);
+
+ Map<String, IndexStats> indexesStats = indexDetails.getIndexesStats();
+ if (!indexesStats.isEmpty()) {
+ indexesStatsListBuilder.reset(AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE);
+ for (Map.Entry<String, IndexStats> stats : indexesStats.entrySet()) {
+ indexStatsRecordBuilder.reset(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+ // index name
+ nameValue.reset();
+ itemValue.reset();
+ aString.setValue(STATS_INDEX_NAME);
+ stringSerde.serialize(aString, nameValue.getDataOutput());
+ aString.setValue(stats.getKey());
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ indexStatsRecordBuilder.addField(nameValue, itemValue);
+
+ // index number of pages
+ nameValue.reset();
+ itemValue.reset();
+ aString.setValue(STATS_NUM_PAGES);
+ stringSerde.serialize(aString, nameValue.getDataOutput());
+ aInt64.setValue(stats.getValue().getNumPages());
+ int64Serde.serialize(aInt64, itemValue.getDataOutput());
+ indexStatsRecordBuilder.addField(nameValue, itemValue);
+
+ itemValue.reset();
+ indexStatsRecordBuilder.write(itemValue.getDataOutput(), true);
+ indexesStatsListBuilder.addItem(itemValue);
+ }
+ nameValue.reset();
+ fieldValue.reset();
+ aString.setValue(INDEXES_STATS);
+ stringSerde.serialize(aString, nameValue.getDataOutput());
+ indexesStatsListBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(nameValue, fieldValue);
+ }
}
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 0d3e015..056a8c2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.utils;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,8 +37,8 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.runtime.aggregates.collections.FirstElementEvalFactory;
import org.apache.asterix.runtime.evaluators.comparisons.GreaterThanDescriptor;
+import org.apache.asterix.runtime.operators.DatasetStreamStatsOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
-import org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor;
import org.apache.asterix.runtime.runningaggregates.std.SampleSlotRunningAggregateFunctionFactory;
import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor;
import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -82,6 +83,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.storage.common.IStorageManager;
/**
* Utility class for sampling operations.
@@ -164,10 +166,10 @@
for (int i = 0; i < nFields; i++) {
columns[i] = i;
}
-
+ IStorageManager storageMgr = metadataProvider.getStorageComponentProvider().getStorageManager();
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
- metadataProvider.getStorageComponentProvider().getStorageManager(), fileSplitProvider);
+ IIndexDataflowHelperFactory dataflowHelperFactory =
+ new IndexDataflowHelperFactory(storageMgr, fileSplitProvider);
// job spec:
IndexUtil.bindJobEventListener(spec, metadataProvider);
@@ -179,7 +181,16 @@
sourceOp = targetOp;
// primary index scan ----> stream stats op
- targetOp = new StreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME);
+ List<Pair<IFileSplitProvider, String>> indexesInfo = metadataProvider.getSplitProviderOfAllIndexes(dataset);
+ IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[indexesInfo.size()];
+ String[] names = new String[indexesInfo.size()];
+ for (int i = 0; i < indexes.length; i++) {
+ Pair<IFileSplitProvider, String> indexInfo = indexesInfo.get(i);
+ indexes[i] = new IndexDataflowHelperFactory(storageMgr, indexInfo.first);
+ names[i] = indexInfo.second;
+ }
+ targetOp =
+ new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes, names);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
sourceOp = targetOp;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
similarity index 73%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
index 268009f..8ea267a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java
@@ -19,26 +19,33 @@
package org.apache.asterix.runtime.operators;
+import java.util.Map;
+
import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IndexStats;
/**
- * Helper method to access stats produced by {@link org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor}
+ * Helper method to access stats produced by {@link DatasetStreamStatsOperatorDescriptor}
*/
-public final class StreamStats {
+public final class DatasetStreamStats {
private final long cardinality;
private final int avgTupleSize;
- public StreamStats(IOperatorStats opStats) {
+ private final Map<String, IndexStats> indexesStats;
+
+ public DatasetStreamStats(IOperatorStats opStats) {
this.cardinality = opStats.getTupleCounter().get();
long totalTupleSize = opStats.getDiskIoCounter().get();
this.avgTupleSize = cardinality > 0 ? (int) (totalTupleSize / cardinality) : 0;
+ this.indexesStats = opStats.getIndexesStats();
}
- static void update(IOperatorStats opStats, long tupleCount, long tupleSize) {
+ static void update(IOperatorStats opStats, long tupleCount, long tupleSize, Map<String, IndexStats> indexStats) {
opStats.getTupleCounter().update(tupleCount);
opStats.getDiskIoCounter().update(tupleSize);
+ opStats.updateIndexesStats(indexStats);
}
public long getCardinality() {
@@ -48,4 +55,8 @@
public int getAvgTupleSize() {
return avgTupleSize;
}
+
+ public Map<String, IndexStats> getIndexesStats() {
+ return indexesStats;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
new file mode 100644
index 0000000..ba29450
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.runtime.operators;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.IndexStats;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+/**
+ * Computes total tuple count and total tuple length for all input tuples,
+ * and emits these values as operator stats.
+ */
+public final class DatasetStreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String operatorName;
+ private final IIndexDataflowHelperFactory[] indexes;
+ private final String[] indexesNames;
+ private Map<String, IndexStats> indexStats;
+
+ public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+ String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) {
+ super(spec, 1, 1);
+ outRecDescs[0] = rDesc;
+ this.operatorName = operatorName;
+ this.indexes = indexes;
+ this.indexesNames = indexesNames;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+
+ private FrameTupleAccessor fta;
+ private long totalTupleCount;
+ private long totalTupleLength;
+
+ @Override
+ public void open() throws HyracksDataException {
+ fta = new FrameTupleAccessor(outRecDescs[0]);
+ totalTupleCount = 0;
+ writer.open();
+ IStatsCollector coll = ctx.getStatsCollector();
+ if (coll != null) {
+ coll.add(new OperatorStats(operatorName));
+ }
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ indexStats = new HashMap<>();
+ for (int i = 0; i < indexes.length; i++) {
+ IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition);
+ try {
+ idxFlowHelper.open();
+ ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance();
+ long numPages = 0;
+ synchronized (indexInstance.getOperationTracker()) {
+ for (ILSMDiskComponent component : indexInstance.getDiskComponents()) {
+ long componentSize = component.getComponentSize();
+ if (component instanceof AbstractLSMWithBloomFilterDiskComponent) {
+ componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component)
+ .getBloomFilter().getFileReference().getFile().length();
+ }
+ numPages += componentSize / indexInstance.getBufferCache().getPageSize();
+ }
+ }
+ indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages));
+ } finally {
+ idxFlowHelper.close();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ fta.reset(buffer);
+ computeStats();
+ FrameUtils.flushFrame(buffer, writer);
+ }
+
+ private void computeStats() {
+ int n = fta.getTupleCount();
+ totalTupleCount += n;
+ for (int i = 0; i < n; i++) {
+ totalTupleLength += fta.getTupleLength(i);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ IStatsCollector statsCollector = ctx.getStatsCollector();
+ if (statsCollector != null) {
+ IOperatorStats stats = statsCollector.getOrAddOperatorStats(operatorName);
+ DatasetStreamStats.update(stats, totalTupleCount, totalTupleLength, indexStats);
+ }
+ writer.close();
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return operatorName;
+ }
+ };
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java
deleted file mode 100644
index 353401a..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
-import org.apache.hyracks.api.job.profiling.IStatsCollector;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/**
- * Computes total tuple count and total tuple length for all input tuples,
- * and emits these values as operator stats.
- */
-public final class StreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private final String operatorName;
-
- public StreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
- String operatorName) {
- super(spec, 1, 1);
- outRecDescs[0] = rDesc;
- this.operatorName = operatorName;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-
- private FrameTupleAccessor fta;
- private long totalTupleCount;
- private long totalTupleLength;
-
- @Override
- public void open() throws HyracksDataException {
- fta = new FrameTupleAccessor(outRecDescs[0]);
- totalTupleCount = 0;
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- fta.reset(buffer);
- computeStats();
- FrameUtils.flushFrame(buffer, writer);
- }
-
- private void computeStats() {
- int n = fta.getTupleCount();
- totalTupleCount += n;
- for (int i = 0; i < n; i++) {
- totalTupleLength += fta.getTupleLength(i);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- IStatsCollector statsCollector = ctx.getStatsCollector();
- if (statsCollector != null) {
- IOperatorStats stats = statsCollector.getOrAddOperatorStats(operatorName);
- StreamStats.update(stats, totalTupleCount, totalTupleLength);
- }
- writer.close();
- }
-
- @Override
- public void flush() throws HyracksDataException {
- writer.flush();
- }
-
- @Override
- public String getDisplayName() {
- return operatorName;
- }
- };
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
index 181249a..0d38fac 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.job.profiling;
import java.io.Serializable;
+import java.util.Map;
import org.apache.hyracks.api.io.IWritable;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -46,4 +47,10 @@
* @return A counter used to track the number of pages pinned by an opeartor
*/
ICounter getDiskIoCounter();
+
+ void updateIndexesStats(Map<String, IndexStats> indexesStats);
+
+ Map<String, IndexStats> getIndexesStats();
+
+ void updateFrom(IOperatorStats stats);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
new file mode 100644
index 0000000..0c471ef
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
+import org.apache.hyracks.api.io.IWritable;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class IndexStats implements IWritable, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ICounter numPages;
+ private String indexName;
+
+ public IndexStats(String indexName, long numPages) {
+ this.indexName = indexName;
+ this.numPages = new Counter("numPages");
+ this.numPages.set(numPages);
+ }
+
+ public static IndexStats create(DataInput input) throws IOException {
+ String indexName = input.readUTF();
+ long numPages = input.readLong();
+ return new IndexStats(indexName, numPages);
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(indexName);
+ output.writeLong(numPages.get());
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ indexName = input.readUTF();
+ numPages.set(input.readLong());
+ }
+
+ public void updateNumPages(long delta) {
+ numPages.update(delta);
+ }
+
+ public long getNumPages() {
+ return numPages.get();
+ }
+
+ @Override
+ public String toString() {
+ return "IndexStats{indexName='" + indexName + "', numPages=" + numPages.get() + '}';
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index 08c1adc..6b7deb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -21,6 +21,8 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
@@ -32,6 +34,7 @@
public final ICounter tupleCounter;
public final ICounter timeCounter;
public final ICounter diskIoCounter;
+ private final Map<String, IndexStats> indexesStats;
public OperatorStats(String operatorName) {
if (operatorName == null || operatorName.isEmpty()) {
@@ -41,6 +44,7 @@
tupleCounter = new Counter("tupleCounter");
timeCounter = new Counter("timeCounter");
diskIoCounter = new Counter("diskIoCounter");
+ indexesStats = new HashMap<>();
}
public static IOperatorStats create(DataInput input) throws IOException {
@@ -71,11 +75,42 @@
}
@Override
+ public void updateIndexesStats(Map<String, IndexStats> stats) {
+ if (stats == null) {
+ return;
+ }
+ for (Map.Entry<String, IndexStats> stat : stats.entrySet()) {
+ String indexName = stat.getKey();
+ IndexStats indexStat = stat.getValue();
+ IndexStats existingIndexStat = indexesStats.get(indexName);
+ if (existingIndexStat == null) {
+ indexesStats.put(indexName, new IndexStats(indexName, indexStat.getNumPages()));
+ } else {
+ existingIndexStat.updateNumPages(indexStat.getNumPages());
+ }
+ }
+ }
+
+ @Override
+ public Map<String, IndexStats> getIndexesStats() {
+ return indexesStats;
+ }
+
+ @Override
+ public void updateFrom(IOperatorStats stats) {
+ tupleCounter.update(stats.getTupleCounter().get());
+ timeCounter.update(stats.getTimeCounter().get());
+ diskIoCounter.update(stats.getDiskIoCounter().get());
+ updateIndexesStats(stats.getIndexesStats());
+ }
+
+ @Override
public void writeFields(DataOutput output) throws IOException {
output.writeUTF(operatorName);
output.writeLong(tupleCounter.get());
output.writeLong(timeCounter.get());
output.writeLong(diskIoCounter.get());
+ writeIndexesStats(output);
}
@Override
@@ -83,11 +118,30 @@
tupleCounter.set(input.readLong());
timeCounter.set(input.readLong());
diskIoCounter.set(input.readLong());
+ readIndexesStats(input);
+ }
+
+ private void writeIndexesStats(DataOutput output) throws IOException {
+ output.writeInt(indexesStats.size());
+ for (Map.Entry<String, IndexStats> indexStat : indexesStats.entrySet()) {
+ output.writeUTF(indexStat.getKey());
+ indexStat.getValue().writeFields(output);
+ }
+ }
+
+ private void readIndexesStats(DataInput input) throws IOException {
+ int numIndexes = input.readInt();
+ for (int i = 0; i < numIndexes; i++) {
+ String indexName = input.readUTF();
+ IndexStats indexStats = IndexStats.create(input);
+ indexesStats.put(indexName, indexStats);
+ }
}
@Override
public String toString() {
return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
- + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }";
+ + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \""
+ + ", \"indexesStats\": \"" + indexesStats + "\" }";
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 0026af4..ee49908 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -39,7 +39,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
public class JobProfile extends AbstractProfile {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private JobId jobId;
@@ -149,9 +149,7 @@
opOutStats = new OperatorStats(operatorName);
outStats[i] = opOutStats;
}
- opOutStats.getTupleCounter().update(opTaskStats.getTupleCounter().get());
- opOutStats.getTimeCounter().update(opTaskStats.getTimeCounter().get());
- opOutStats.getDiskIoCounter().update(opTaskStats.getDiskIoCounter().get());
+ opOutStats.updateFrom(opTaskStats);
}
}
return Arrays.asList(outStats);