[ASTERIXDB-3141][ASTERIXDB-3134] Allow querying columnar datasets
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
This patch adds the ability to query columnar datasets.
Also, it teaches the compiler to read only the requested
columns. This patch also includes the ability to filter
mega-leaf nodes given a query predicate.
Interface changes:
- IMetadataProvider#getScannerRuntime()
* To allow projections for both data records and meta records
- IProjectionInfo
* Renamed to IProjectionFiltrationInfo
* Added getFilterExpression() for columnar filters
User model changes:
- After this change you can create columnar datasets
Example:
CREATE DATASET ExperDataset(ExperType)
PRIMARY KEY uid AUTOGENERATED
WITH {
"dataset-format":{"format":"column"}
};
- Added compiler property:
* compiler.column.filter
to enable/disable the usage of columnar filter
- Added storage properties:
* storage.column.max.tuple.count
An integer to tell the maximum number of
tuples to store per mega leaf node
* storage.column.free.space.tolerance
the percentage of tolerable empty space to
minimize column splitting
Change-Id: Ie9188bbd8463db22bf10c6871046c680528d5640
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17430
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index cadc714..8e0bc0c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -91,5 +91,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
new file mode 100644
index 0000000..56090bb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dataflow;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.INullIntrospector;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.LSMColumnBTreeUtil;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class LSMColumnBTreeLocalResource extends LSMBTreeLocalResource {
+ private final IColumnManagerFactory columnManagerFactory;
+
+ public LSMColumnBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+ int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, String path,
+ IStorageManager storageManager, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, int[] btreeFields, ILSMOperationTrackerFactory opTrackerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
+ INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ IColumnManagerFactory columnManagerFactory) {
+ super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, path, storageManager,
+ mergePolicyFactory, mergePolicyProperties, null, null, btreeFields, null, opTrackerProvider,
+ ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider,
+ ioSchedulerProvider, true, compressorDecompressorFactory, true, nullTypeTraits, nullIntrospector,
+ isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
+ double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+ ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
+ boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory)
+ throws HyracksDataException {
+ super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+ compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ @Override
+ public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
+ IIOManager ioManager = serviceCtx.getIoManager();
+ FileReference file = ioManager.resolve(path);
+ List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
+ pageWriteCallbackFactory.initialize(serviceCtx, this);
+ return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
+ typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
+ serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
+ columnManagerFactory);
+ }
+
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+ throws HyracksDataException {
+ int[] bloomFilterKeyFields = OBJECT_MAPPER.convertValue(json.get("bloomFilterKeyFields"), int[].class);
+ double bloomFilterFalsePositiveRate = json.get("bloomFilterFalsePositiveRate").asDouble();
+ boolean isPrimary = json.get("isPrimary").asBoolean();
+ boolean hasBloomFilter = getOrDefaultHasBloomFilter(json, isPrimary);
+ int[] btreeFields = OBJECT_MAPPER.convertValue(json.get("btreeFields"), int[].class);
+ JsonNode compressorDecompressorNode = json.get("compressorDecompressorFactory");
+ ICompressorDecompressorFactory compDecompFactory = (ICompressorDecompressorFactory) registry
+ .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
+ JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
+ boolean isSecondaryNoIncrementalMaintenance =
+ getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+ IColumnManagerFactory columnManagerFactory =
+ (IColumnManagerFactory) registry.deserialize(columnManagerFactoryNode);
+ return new LSMColumnBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ isPrimary, btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance,
+ columnManagerFactory);
+ }
+
+ @Override
+ protected void appendToJson(final ObjectNode json, IPersistedResourceRegistry registry)
+ throws HyracksDataException {
+ super.appendToJson(json, registry);
+ json.putPOJO("columnManagerFactory", columnManagerFactory.toJson(registry));
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
new file mode 100644
index 0000000..eccb7c2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dataflow;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.INullIntrospector;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class LSMColumnBTreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
+ private static final long serialVersionUID = -676367767925618165L;
+ private final IColumnManagerFactory columnManagerFactory;
+
+ public LSMColumnBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+ int[] btreeFields, ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
+ INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ IColumnManagerFactory columnManagerFactory) {
+ super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
+ opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
+ vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, true, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, true, btreeFields, compressorDecompressorFactory, true, nullTypeTraits,
+ nullIntrospector, isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ @Override
+ public LsmResource createResource(FileReference fileRef) {
+ return new LSMColumnBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
+ mergePolicyProperties, btreeFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
+ metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, compressorDecompressorFactory,
+ nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d39f94e..d0e1e1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -40,8 +40,8 @@
private final IColumnBufferProvider[] primaryKeyBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
private final int numberOfPrimaryKeys;
- private int totalNumberOfPages;
- private int numOfSkippedPages;
+ private int totalNumberOfMegaLeafNodes;
+ private int numOfSkippedMegaLeafNodes;
protected int tupleIndex;
/**
@@ -73,8 +73,8 @@
buffersProviders[i] = new ColumnSingleBufferProvider(columnIndex);
}
}
- totalNumberOfPages = 0;
- numOfSkippedPages = 0;
+ totalNumberOfMegaLeafNodes = 0;
+ numOfSkippedMegaLeafNodes = 0;
}
@Override
@@ -104,9 +104,9 @@
startColumn(provider, tupleIndex, i, numberOfTuples);
}
} else {
- numOfSkippedPages++;
+ numOfSkippedMegaLeafNodes++;
}
- totalNumberOfPages++;
+ totalNumberOfMegaLeafNodes++;
}
protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
@@ -149,8 +149,9 @@
@Override
public final void close() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Skipped {} pages out of {} in total", numOfSkippedPages, totalNumberOfPages);
+ if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
+ LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
+ totalNumberOfMegaLeafNodes);
}
}