[ASTERIXDB-3141][ASTERIXDB-3134] Allow querying columnar datasets
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
This patch adds the ability to query columnar datasets.
Also, it teaches the compiler to read only the requested
columns. This patch also includes the ability to filter
mega-leaf nodes given a query predicate.
Interface changes:
- IMetadataProvider#getScannerRuntime()
* To allow projections for both data records and meta records
- IProjectionInfo
* Renamed to IProjectionFiltrationInfo
* Added getFilterExpression() for columnar filters
User model changes:
- After this change you can create columnar datasets
Example:
CREATE DATASET ExperDataset(ExperType)
PRIMARY KEY uid AUTOGENERATED
WITH {
"dataset-format":{"format":"column"}
};
- Added compiler property:
* compiler.column.filter
to enable/disable the usage of columnar filter
- Added storage properties:
* storage.column.max.tuple.count
An integer to tell the maximum number of
tuples to store per mega leaf node
* storage.column.free.space.tolerance
the percentage of tolerable empty space to
minimize column splitting
Change-Id: Ie9188bbd8463db22bf10c6871046c680528d5640
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17430
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index d350789..4596393 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -43,41 +43,42 @@
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public interface IMetadataProvider<S, I> {
- public IDataSource<S> findDataSource(S id) throws AlgebricksException;
+ IDataSource<S> findDataSource(S id) throws AlgebricksException;
/**
* Obs: A scanner may choose to contribute a null
* AlgebricksPartitionConstraint and implement
* contributeSchedulingConstraints instead.
*/
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars,
ITupleFilterFactory tupleFilterFactory, long outputLimit, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
- IProjectionInfo<?> projectionInfo) throws AlgebricksException;
+ IProjectionFiltrationInfo<?> projectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo)
+ throws AlgebricksException;
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
- RecordDescriptor inputDesc) throws AlgebricksException;
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns,
+ IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, RecordDescriptor inputDesc)
+ throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
+ IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
@@ -87,42 +88,28 @@
* Creates the insert runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
*
- * @param dataSource
- * Target secondary index.
- * @param propagatedSchema
- * Output schema of the insert/delete operator to be created.
- * @param inputSchemas
- * Output schemas of the insert/delete operator to be created.
- * @param typeEnv
- * Type environment of the original IndexInsertDeleteOperator operator.
- * @param primaryKeys
- * Variables for the dataset's primary keys that the dataSource secondary index belongs to.
- * @param secondaryKeys
- * Variables for the secondary-index keys.
- * @param additionalNonKeyFields
- * Additional variables that can be passed to the secondary index as payload.
- * This can be useful when creating a second filter on a non-primary and non-secondary
- * fields for additional pruning power.
- * @param filterExpr
- * Filtering expression to be pushed inside the runtime op.
- * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
- * @param recordDesc
- * Output record descriptor of the runtime op to be created.
- * @param context
- * Job generation context.
- * @param spec
- * Target job specification.
- * @param secondaryKeysPipelines
- * Nested plans to extract secondary keys.
- * @param pipelineTopSchema
- * Schema of the primary pipeline for secondary keys.
- * @return
- * A Hyracks IOperatorDescriptor and its partition constraint.
+ * @param dataSource Target secondary index.
+ * @param propagatedSchema Output schema of the insert/delete operator to be created.
+ * @param inputSchemas Output schemas of the insert/delete operator to be created.
+ * @param typeEnv Type environment of the original IndexInsertDeleteOperator operator.
+ * @param primaryKeys Variables for the dataset's primary keys that the dataSource secondary index belongs to.
+ * @param secondaryKeys Variables for the secondary-index keys.
+ * @param additionalNonKeyFields Additional variables that can be passed to the secondary index as payload.
+ * This can be useful when creating a second filter on a non-primary and non-secondary
+ * fields for additional pruning power.
+ * @param filterExpr Filtering expression to be pushed inside the runtime op.
+ * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
+ * @param recordDesc Output record descriptor of the runtime op to be created.
+ * @param context Job generation context.
+ * @param spec Target job specification.
+ * @param secondaryKeysPipelines Nested plans to extract secondary keys.
+ * @param pipelineTopSchema Schema of the primary pipeline for secondary keys.
+ * @return A Hyracks IOperatorDescriptor and its partition constraint.
* @throws AlgebricksException
*/
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(IDataSourceIndex<I, S> dataSource,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload,
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
@@ -132,42 +119,28 @@
* Creates the delete runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
*
- * @param dataSource
- * Target secondary index.
- * @param propagatedSchema
- * Output schema of the insert/delete operator to be created.
- * @param inputSchemas
- * Output schemas of the insert/delete operator to be created.
- * @param typeEnv
- * Type environment of the original IndexInsertDeleteOperator operator.
- * @param primaryKeys
- * Variables for the dataset's primary keys that the dataSource secondary index belongs to.
- * @param secondaryKeys
- * Variables for the secondary-index keys.
- * @param additionalNonKeyFields
- * Additional variables that can be passed to the secondary index as payload.
- * This can be useful when creating a second filter on a non-primary and non-secondary
- * fields for additional pruning power.
- * @param filterExpr
- * Filtering expression to be pushed inside the runtime op.
- * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
- * @param recordDesc
- * Output record descriptor of the runtime op to be created.
- * @param context
- * Job generation context.
- * @param spec
- * Target job specification.
- * @param secondaryKeysPipelines
- * Nested plan to extract secondary keys.
- * @param pipelineTopSchema
- * Schema of the primary pipeline for secondary keys.
- * @return
- * A Hyracks IOperatorDescriptor and its partition constraint.
+ * @param dataSource Target secondary index.
+ * @param propagatedSchema Output schema of the insert/delete operator to be created.
+ * @param inputSchemas Output schemas of the insert/delete operator to be created.
+ * @param typeEnv Type environment of the original IndexInsertDeleteOperator operator.
+ * @param primaryKeys Variables for the dataset's primary keys that the dataSource secondary index belongs to.
+ * @param secondaryKeys Variables for the secondary-index keys.
+ * @param additionalNonKeyFields Additional variables that can be passed to the secondary index as payload.
+ * This can be useful when creating a second filter on a non-primary and non-secondary
+ * fields for additional pruning power.
+ * @param filterExpr Filtering expression to be pushed inside the runtime op.
+ * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
+ * @param recordDesc Output record descriptor of the runtime op to be created.
+ * @param context Job generation context.
+ * @param spec Target job specification.
+ * @param secondaryKeysPipelines Nested plan to extract secondary keys.
+ * @param pipelineTopSchema Schema of the primary pipeline for secondary keys.
+ * @return A Hyracks IOperatorDescriptor and its partition constraint.
* @throws AlgebricksException
*/
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(IDataSourceIndex<I, S> dataSource,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema) throws AlgebricksException;
@@ -177,48 +150,37 @@
* secondary key into [token, number of token] pair in a length-partitioned index.
* In case of non length-partitioned index, it tokenizes secondary key into [token].
*
- * @param dataSource
- * Target secondary index.
- * @param propagatedSchema
- * Output schema of the insert/delete operator to be created.
- * @param inputSchemas
- * Output schemas of the insert/delete operator to be created.
- * @param typeEnv
- * Type environment of the original IndexInsertDeleteOperator operator.
- * @param primaryKeys
- * Variables for the dataset's primary keys that the dataSource secondary index belongs to.
- * @param secondaryKeys
- * Variables for the secondary-index keys.
- * @param filterExpr
- * Filtering expression to be pushed inside the runtime op.
- * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
- * @param recordDesc
- * Output record descriptor of the runtime op to be created.
- * @param context
- * Job generation context.
- * @param spec
- * Target job specification.
- * @return
- * A Hyracks IOperatorDescriptor and its partition constraint.
+ * @param dataSource Target secondary index.
+ * @param propagatedSchema Output schema of the insert/delete operator to be created.
+ * @param inputSchemas Output schemas of the insert/delete operator to be created.
+ * @param typeEnv Type environment of the original IndexInsertDeleteOperator operator.
+ * @param primaryKeys Variables for the dataset's primary keys that the dataSource secondary index belongs to.
+ * @param secondaryKeys Variables for the secondary-index keys.
+ * @param filterExpr Filtering expression to be pushed inside the runtime op.
+ * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
+ * @param recordDesc Output record descriptor of the runtime op to be created.
+ * @param context Job generation context.
+ * @param spec Target job specification.
+ * @return A Hyracks IOperatorDescriptor and its partition constraint.
* @throws AlgebricksException
*/
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
- IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
- boolean bulkload) throws AlgebricksException;
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(IDataSourceIndex<I, S> dataSource,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
+ throws AlgebricksException;
- public IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
+ IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
- public IFunctionInfo lookupFunction(FunctionIdentifier fid);
+ IFunctionInfo lookupFunction(FunctionIdentifier fid);
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalFilterFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification jobSpec) throws AlgebricksException;
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
@@ -226,12 +188,11 @@
LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
JobSpecification spec, List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException;
- public ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException;
+ ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException;
- public Map<String, Object> getConfig();
+ Map<String, Object> getConfig();
- public boolean isBlockingOperatorDisabled();
+ boolean isBlockingOperatorDisabled();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
similarity index 80%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
index 3c1a24d..9973d08 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionFiltrationInfo.java
@@ -18,18 +18,22 @@
*/
package org.apache.hyracks.algebricks.core.algebra.metadata;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
/**
* Generic interface to include the projection information for
* {@link org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator}
*/
-public interface IProjectionInfo<T> {
+public interface IProjectionFiltrationInfo<T> {
/**
* @return projected values' information
*/
T getProjectionInfo();
+ ILogicalExpression getFilterExpression();
+
/**
- * @return a copy of the {@link IProjectionInfo}
+ * @return a copy of the {@link IProjectionFiltrationInfo}
*/
- IProjectionInfo<T> createCopy();
+ IProjectionFiltrationInfo<T> createCopy();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index 9f73113..e3ce82d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -29,7 +29,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -50,19 +50,22 @@
// the maximum of number of results output by this operator
private long outputLimit = -1;
- private IProjectionInfo<?> projectionInfo;
+ private IProjectionFiltrationInfo<?> datasetProjectionInfo;
+ private IProjectionFiltrationInfo<?> metaProjectionInfo;
public DataSourceScanOperator(List<LogicalVariable> variables, IDataSource<?> dataSource) {
- this(variables, dataSource, null, -1, null);
+ this(variables, dataSource, null, -1, null, null);
}
public DataSourceScanOperator(List<LogicalVariable> variables, IDataSource<?> dataSource,
- Mutable<ILogicalExpression> selectCondition, long outputLimit, IProjectionInfo<?> projectionInfo) {
+ Mutable<ILogicalExpression> selectCondition, long outputLimit,
+ IProjectionFiltrationInfo<?> datasetProjectionInfo, IProjectionFiltrationInfo<?> metaProjectionInfo) {
super(variables, dataSource);
projectVars = new ArrayList<>();
this.selectCondition = selectCondition;
this.outputLimit = outputLimit;
- this.projectionInfo = projectionInfo;
+ this.datasetProjectionInfo = datasetProjectionInfo;
+ this.metaProjectionInfo = metaProjectionInfo;
}
@Override
@@ -173,11 +176,19 @@
this.outputLimit = outputLimit;
}
- public void setProjectionInfo(IProjectionInfo<?> projectionInfo) {
- this.projectionInfo = projectionInfo;
+ public void setDatasetProjectionInfo(IProjectionFiltrationInfo<?> datasetProjectionInfo) {
+ this.datasetProjectionInfo = datasetProjectionInfo;
}
- public IProjectionInfo<?> getProjectionInfo() {
- return projectionInfo;
+ public IProjectionFiltrationInfo<?> getDatasetProjectionInfo() {
+ return datasetProjectionInfo;
+ }
+
+ public void setMetaProjectionInfo(IProjectionFiltrationInfo<?> metaProjectionInfo) {
+ this.metaProjectionInfo = metaProjectionInfo;
+ }
+
+ public IProjectionFiltrationInfo<?> getMetaProjectionInfo() {
+ return metaProjectionInfo;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index f8d07b8..6d11931 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -39,14 +40,18 @@
// the maximum of number of results output by this operator
private long outputLimit = -1;
+ private IProjectionFiltrationInfo<?> datasetProjectionInfo;
+ private IProjectionFiltrationInfo<?> metaProjectionInfo;
+
public UnnestMapOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression,
List<Object> variableTypes, boolean propagateInput) {
- this(variables, expression, variableTypes, propagateInput, null, -1);
+ this(variables, expression, variableTypes, propagateInput, null, -1, null, null);
}
public UnnestMapOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression,
List<Object> variableTypes, boolean propagateInput, Mutable<ILogicalExpression> selectCondition,
- long outputLimit) {
+ long outputLimit, IProjectionFiltrationInfo<?> datasetProjectionInfo,
+ IProjectionFiltrationInfo<?> metaProjectionInfo) {
super(variables, expression, variableTypes, propagateInput);
this.selectCondition = selectCondition;
this.outputLimit = outputLimit;
@@ -101,4 +106,20 @@
this.outputLimit = outputLimit;
}
+ public void setDatasetProjectionInfo(IProjectionFiltrationInfo<?> projectionInfo) {
+ this.datasetProjectionInfo = projectionInfo;
+ }
+
+ public IProjectionFiltrationInfo<?> getDatasetProjectionInfo() {
+ return datasetProjectionInfo;
+ }
+
+ public void setMetaProjectionInfo(IProjectionFiltrationInfo<?> metaProjectionInfo) {
+ this.metaProjectionInfo = metaProjectionInfo;
+ }
+
+ public IProjectionFiltrationInfo<?> getMetaProjectionInfo() {
+ return metaProjectionInfo;
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index cfae695..5a6574a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -447,7 +447,9 @@
if (!isomorphic) {
return Boolean.FALSE;
}
- isomorphic = op.getExpressionRef().getValue().equals(unnestOpArg.getExpressionRef().getValue());
+ isomorphic = op.getExpressionRef().getValue().equals(unnestOpArg.getExpressionRef().getValue())
+ && Objects.equals(op.getDatasetProjectionInfo(), unnestOpArg.getDatasetProjectionInfo())
+ && Objects.equals(op.getMetaProjectionInfo(), unnestOpArg.getMetaProjectionInfo());
return isomorphic;
}
@@ -480,7 +482,8 @@
DataSourceScanOperator argScan = (DataSourceScanOperator) arg;
boolean isomorphic = op.getDataSource().getId().equals(argScan.getDataSource().getId())
&& op.getOutputLimit() == argScan.getOutputLimit()
- && Objects.equals(op.getProjectionInfo(), argScan.getProjectionInfo());
+ && Objects.equals(op.getDatasetProjectionInfo(), argScan.getDatasetProjectionInfo())
+ && Objects.equals(op.getMetaProjectionInfo(), argScan.getMetaProjectionInfo());
if (!isomorphic) {
return Boolean.FALSE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 08acf13..71a659b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -37,7 +37,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.IVariableContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -104,22 +104,17 @@
private final boolean reuseFreeVars;
/**
- * @param varContext
- * , the variable context.
- * @param typeContext
- * the type context.
+ * @param varContext , the variable context.
+ * @param typeContext the type context.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext) {
this(varContext, typeContext, new LinkedHashMap<>(), false);
}
/**
- * @param varContext
- * , the variable context.
- * @param typeContext
- * the type context.
- * @param reuseFreeVars
- * whether free variables in the given plan tree should be reused.
+ * @param varContext , the variable context.
+ * @param typeContext the type context.
+ * @param reuseFreeVars whether free variables in the given plan tree should be reused.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
boolean reuseFreeVars) {
@@ -127,16 +122,12 @@
}
/**
- * @param varContext
- * , the variable context.
- * @param typeContext
- * the type context.
- * @param inVarMapping
- * Variable mapping keyed by variables in the original plan.
- * Those variables are replaced by their corresponding value in
- * the map in the copied plan.
- * @param reuseFreeVars
- * whether free variables in the given plan tree should be reused.
+ * @param varContext , the variable context.
+ * @param typeContext the type context.
+ * @param inVarMapping Variable mapping keyed by variables in the original plan.
+ * Those variables are replaced by their corresponding value in
+ * the map in the copied plan.
+ * @param reuseFreeVars whether free variables in the given plan tree should be reused.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping, boolean reuseFreeVars) {
@@ -326,9 +317,12 @@
throws AlgebricksException {
Mutable<ILogicalExpression> newSelectCondition = op.getSelectCondition() != null
? exprDeepCopyVisitor.deepCopyExpressionReference(op.getSelectCondition()) : null;
- IProjectionInfo<?> projectionInfo = op.getProjectionInfo() != null ? op.getProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> datasetProjectionInfo =
+ op.getDatasetProjectionInfo() != null ? op.getDatasetProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> metaProjectionInfo =
+ op.getMetaProjectionInfo() != null ? op.getMetaProjectionInfo().createCopy() : null;
DataSourceScanOperator opCopy = new DataSourceScanOperator(deepCopyVariableList(op.getVariables()),
- op.getDataSource(), newSelectCondition, op.getOutputLimit(), projectionInfo);
+ op.getDataSource(), newSelectCondition, op.getOutputLimit(), datasetProjectionInfo, metaProjectionInfo);
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
@@ -540,9 +534,14 @@
throws AlgebricksException {
Mutable<ILogicalExpression> newSelectCondition = op.getSelectCondition() != null
? exprDeepCopyVisitor.deepCopyExpressionReference(op.getSelectCondition()) : null;
+ IProjectionFiltrationInfo<?> datasetProjectionInfo =
+ op.getDatasetProjectionInfo() != null ? op.getDatasetProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> metaProjectionInfo =
+ op.getMetaProjectionInfo() != null ? op.getMetaProjectionInfo().createCopy() : null;
UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()), op.getVariableTypes(),
- op.propagatesInput(), newSelectCondition, op.getOutputLimit());
+ op.propagatesInput(), newSelectCondition, op.getOutputLimit(), datasetProjectionInfo,
+ metaProjectionInfo);
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 8ef0b5b..b7029d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -33,7 +33,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -245,8 +245,13 @@
newInputList.addAll(op.getVariables());
Mutable<ILogicalExpression> newSelectCondition =
op.getSelectCondition() != null ? deepCopyExpressionRef(op.getSelectCondition()) : null;
+ IProjectionFiltrationInfo<?> datasetProjectionInfo =
+ op.getDatasetProjectionInfo() != null ? op.getDatasetProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> metaProjectionInfo =
+ op.getMetaProjectionInfo() != null ? op.getMetaProjectionInfo().createCopy() : null;
return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
- new ArrayList<>(op.getVariableTypes()), op.propagatesInput(), newSelectCondition, op.getOutputLimit());
+ new ArrayList<>(op.getVariableTypes()), op.propagatesInput(), newSelectCondition, op.getOutputLimit(),
+ datasetProjectionInfo, metaProjectionInfo);
}
@Override
@@ -264,10 +269,13 @@
newInputList.addAll(op.getVariables());
Mutable<ILogicalExpression> newSelectCondition =
op.getSelectCondition() != null ? deepCopyExpressionRef(op.getSelectCondition()) : null;
- IProjectionInfo<?> projectionInfo = op.getProjectionInfo() != null ? op.getProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> datasetProjectionInfo =
+ op.getDatasetProjectionInfo() != null ? op.getDatasetProjectionInfo().createCopy() : null;
+ IProjectionFiltrationInfo<?> metaProjectionInfo =
+ op.getMetaProjectionInfo() != null ? op.getMetaProjectionInfo().createCopy() : null;
return new DataSourceScanOperator(newInputList, op.getDataSource(), newSelectCondition, op.getOutputLimit(),
- projectionInfo);
+ datasetProjectionInfo, metaProjectionInfo);
}
@Override
@@ -379,7 +387,7 @@
private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
List<Mutable<ILogicalExpression>> oldExprs) {
for (Mutable<ILogicalExpression> oldExpr : oldExprs) {
- newExprs.add(new MutableObject<>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression()));
+ newExprs.add(new MutableObject<>(oldExpr.getValue().cloneExpression()));
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index 48dc607..866334a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -116,10 +116,10 @@
scan.getSelectCondition().getValue(), context);
}
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p =
- mp.getScannerRuntime(dataSource, vars, projectVars, scan.isProjectPushed(), scan.getMinFilterVars(),
- scan.getMaxFilterVars(), tupleFilterFactory, scan.getOutputLimit(), opSchema, typeEnv, context,
- builder.getJobSpec(), implConfig, scan.getProjectionInfo());
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = mp.getScannerRuntime(dataSource, vars, projectVars,
+ scan.isProjectPushed(), scan.getMinFilterVars(), scan.getMaxFilterVars(), tupleFilterFactory,
+ scan.getOutputLimit(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig,
+ scan.getDatasetProjectionInfo(), scan.getMetaProjectionInfo());
IOperatorDescriptor opDesc = p.first;
opDesc.setSourceLocation(scan.getSourceLocation());
builder.contributeHyracksOperator(scan, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7aabbef..069012b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -33,7 +33,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
@@ -359,6 +359,9 @@
AlgebricksStringBuilderWriter plan = printAbstractUnnestMapOperator(op, indent, "unnest-map", null);
appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
appendLimitInformation(plan, op.getOutputLimit());
+ appendProjectInformation(plan, "project", op.getDatasetProjectionInfo());
+ appendProjectInformation(plan, "project-meta", op.getMetaProjectionInfo());
+ appendFilterExpression(plan, op.getDatasetProjectionInfo());
return null;
}
@@ -386,7 +389,9 @@
appendFilterInformation(plan, op.getMinFilterVars(), op.getMaxFilterVars());
appendSelectConditionInformation(plan, op.getSelectCondition(), indent);
appendLimitInformation(plan, op.getOutputLimit());
- appendProjectInformation(plan, op.getProjectionInfo());
+ appendProjectInformation(plan, "project", op.getDatasetProjectionInfo());
+ appendProjectInformation(plan, "project-meta", op.getMetaProjectionInfo());
+ appendFilterExpression(plan, op.getDatasetProjectionInfo());
return null;
}
@@ -417,15 +422,30 @@
}
}
- private void appendProjectInformation(AlgebricksStringBuilderWriter plan, IProjectionInfo<?> projectionInfo) {
+ private void appendProjectInformation(AlgebricksStringBuilderWriter plan, String projectionSource,
+ IProjectionFiltrationInfo<?> projectionInfo) {
final String projectedFields = projectionInfo == null ? "" : projectionInfo.toString();
if (!projectedFields.isEmpty()) {
- plan.append(" project (");
+ plan.append(" ");
+ plan.append(projectionSource);
+ plan.append(" (");
plan.append(projectedFields);
plan.append(")");
}
}
+ private void appendFilterExpression(AlgebricksStringBuilderWriter plan,
+ IProjectionFiltrationInfo<?> projectionInfo) {
+ final String filterExpr = projectionInfo == null || projectionInfo.getFilterExpression() == null ? ""
+ : projectionInfo.getFilterExpression().toString();
+ if (!filterExpr.isEmpty()) {
+ plan.append(" filter on ");
+ plan.append("(");
+ plan.append(filterExpr);
+ plan.append(")");
+ }
+ }
+
@Override
public Void visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
addIndent(indent).append("limit");
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index e54ef02..115448e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -38,7 +38,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -545,6 +545,9 @@
try {
writeUnnestMapOperator(op, indent, "unnest-map", null);
writeSelectLimitInformation(op.getSelectCondition(), op.getOutputLimit(), indent);
+ writeProjectInformation("project", op.getDatasetProjectionInfo());
+ writeProjectInformation("project-meta", op.getMetaProjectionInfo());
+ writeFilterInformation(op.getDatasetProjectionInfo());
return null;
} catch (IOException e) {
throw AlgebricksException.create(ErrorCode.ERROR_PRINTING_PLAN, e, String.valueOf(e));
@@ -574,7 +577,9 @@
}
writeFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars());
writeSelectLimitInformation(op.getSelectCondition(), op.getOutputLimit(), indent);
- writeProjectInformation(op.getProjectionInfo());
+ writeProjectInformation("project", op.getDatasetProjectionInfo());
+ writeProjectInformation("project-meta", op.getMetaProjectionInfo());
+ writeFilterInformation(op.getDatasetProjectionInfo());
return null;
} catch (IOException e) {
throw AlgebricksException.create(ErrorCode.ERROR_PRINTING_PLAN, e, String.valueOf(e));
@@ -903,10 +908,19 @@
}
}
- private void writeProjectInformation(IProjectionInfo<?> projectionInfo) throws IOException {
+ private void writeProjectInformation(String projectionSource, IProjectionFiltrationInfo<?> projectionInfo)
+ throws IOException {
final String projectedFields = projectionInfo == null ? "" : projectionInfo.toString();
if (!projectedFields.isEmpty()) {
- jsonGenerator.writeStringField("project", projectedFields);
+ jsonGenerator.writeStringField(projectionSource, projectedFields);
+ }
+ }
+
+ private void writeFilterInformation(IProjectionFiltrationInfo<?> projectionInfo) throws IOException {
+ final String filterExpr = projectionInfo == null || projectionInfo.getFilterExpression() == null ? ""
+ : projectionInfo.getFilterExpression().toString();
+ if (!filterExpr.isEmpty()) {
+ jsonGenerator.writeStringField("filter-on", filterExpr);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index c94d72a..0d02203 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -45,4 +45,5 @@
public static final int EXTERNAL_SCAN_BUFFER_SIZE =
StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE);
public static final boolean BATCH_LOOKUP_DEFAULT = false;
+ public static final boolean COLUMN_FILTER_DEFAULT = false;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 86be6d0..3e51a80 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -52,6 +52,7 @@
private static final String CBO_TEST = "CBO_TEST";
private static final String FORCE_JOIN_ORDER = "FORCE_JOIN_ORDER";
private static final String QUERY_PLAN_SHAPE = "QUERY_PLAN_SHAPE";
+ private static final String COLUMN_FILTER = "COLUMN_FILTER";
private final Properties properties = new Properties();
@@ -294,6 +295,14 @@
setInt(EXTERNAL_SCAN_BUFFER_SIZE, bufferSize);
}
+ public void setColumnFilter(boolean columnFilter) {
+ setBoolean(COLUMN_FILTER, columnFilter);
+ }
+
+ public boolean isColumnFilterEnabled() {
+ return getBoolean(COLUMN_FILTER, AlgebricksConfig.COLUMN_FILTER_DEFAULT);
+ }
+
private void setInt(String property, int value) {
properties.setProperty(property, Integer.toString(value));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index d555b31..4fc8057 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -243,12 +243,6 @@
cursor.next();
matchingTupleCount++;
ITupleReference tuple = cursor.getTuple();
- if (tupleFilter != null) {
- referenceFilterTuple.reset(tuple);
- if (!tupleFilter.accept(referenceFilterTuple)) {
- continue;
- }
- }
tb.reset();
if (retainInput) {
@@ -258,7 +252,17 @@
tb.addFieldEndOffset();
}
}
- writeTupleToOutput(tuple);
+
+ // tuple must be written first before the filter is applied to
+ // assemble columnar tuples
+ tuple = writeTupleToOutput(tuple);
+ if (tupleFilter != null) {
+ referenceFilterTuple.reset(tuple);
+ if (!tupleFilter.accept(referenceFilterTuple)) {
+ continue;
+ }
+ }
+
if (appendSearchCallbackProceedResult) {
writeSearchCallbackProceedResult(tb,
((ILSMIndexCursor) cursor).getSearchOperationCallbackProceedResult());
@@ -355,9 +359,9 @@
}
}
- protected void writeTupleToOutput(ITupleReference tuple) throws IOException {
+ protected ITupleReference writeTupleToOutput(ITupleReference tuple) throws IOException {
try {
- tupleProjector.project(tuple, dos, tb);
+ return tupleProjector.project(tuple, dos, tb);
} catch (Exception e) {
throw e;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index cadc714..8e0bc0c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -91,5 +91,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
new file mode 100644
index 0000000..56090bb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dataflow;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.INullIntrospector;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.LSMColumnBTreeUtil;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class LSMColumnBTreeLocalResource extends LSMBTreeLocalResource {
+ private final IColumnManagerFactory columnManagerFactory;
+
+ public LSMColumnBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+ int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, String path,
+ IStorageManager storageManager, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, int[] btreeFields, ILSMOperationTrackerFactory opTrackerProvider,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
+ INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ IColumnManagerFactory columnManagerFactory) {
+ super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, true, path, storageManager,
+ mergePolicyFactory, mergePolicyProperties, null, null, btreeFields, null, opTrackerProvider,
+ ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider,
+ ioSchedulerProvider, true, compressorDecompressorFactory, true, nullTypeTraits, nullIntrospector,
+ isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields,
+ double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields,
+ ICompressorDecompressorFactory compressorDecompressorFactory, boolean hasBloomFilter,
+ boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory columnManagerFactory)
+ throws HyracksDataException {
+ super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields,
+ compressorDecompressorFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ @Override
+ public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
+ IIOManager ioManager = serviceCtx.getIoManager();
+ FileReference file = ioManager.resolve(path);
+ List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
+ pageWriteCallbackFactory.initialize(serviceCtx, this);
+ return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
+ typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
+ serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
+ columnManagerFactory);
+ }
+
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+ throws HyracksDataException {
+ int[] bloomFilterKeyFields = OBJECT_MAPPER.convertValue(json.get("bloomFilterKeyFields"), int[].class);
+ double bloomFilterFalsePositiveRate = json.get("bloomFilterFalsePositiveRate").asDouble();
+ boolean isPrimary = json.get("isPrimary").asBoolean();
+ boolean hasBloomFilter = getOrDefaultHasBloomFilter(json, isPrimary);
+ int[] btreeFields = OBJECT_MAPPER.convertValue(json.get("btreeFields"), int[].class);
+ JsonNode compressorDecompressorNode = json.get("compressorDecompressorFactory");
+ ICompressorDecompressorFactory compDecompFactory = (ICompressorDecompressorFactory) registry
+ .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class);
+ JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
+ boolean isSecondaryNoIncrementalMaintenance =
+ getOrDefaultBoolean(json, "isSecondaryNoIncrementalMaintenance", false);
+ IColumnManagerFactory columnManagerFactory =
+ (IColumnManagerFactory) registry.deserialize(columnManagerFactoryNode);
+ return new LSMColumnBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
+ isPrimary, btreeFields, compDecompFactory, hasBloomFilter, isSecondaryNoIncrementalMaintenance,
+ columnManagerFactory);
+ }
+
+ @Override
+ protected void appendToJson(final ObjectNode json, IPersistedResourceRegistry registry)
+ throws HyracksDataException {
+ super.appendToJson(json, registry);
+ json.putPOJO("columnManagerFactory", columnManagerFactory.toJson(registry));
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
new file mode 100644
index 0000000..eccb7c2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dataflow;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
+import org.apache.hyracks.storage.am.common.api.INullIntrospector;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResourceFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
+import org.apache.hyracks.storage.common.IStorageManager;
+
+public class LSMColumnBTreeLocalResourceFactory extends LSMBTreeLocalResourceFactory {
+ private static final long serialVersionUID = -676367767925618165L;
+ private final IColumnManagerFactory columnManagerFactory;
+
+ public LSMColumnBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits,
+ IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+ ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+ int[] btreeFields, ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
+ INullIntrospector nullIntrospector, boolean isSecondaryNoIncrementalMaintenance,
+ IColumnManagerFactory columnManagerFactory) {
+ super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields,
+ opTrackerFactory, ioOpCallbackFactory, pageWriteCallbackFactory, metadataPageManagerFactory,
+ vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, true, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, true, btreeFields, compressorDecompressorFactory, true, nullTypeTraits,
+ nullIntrospector, isSecondaryNoIncrementalMaintenance);
+ this.columnManagerFactory = columnManagerFactory;
+ }
+
+ @Override
+ public LsmResource createResource(FileReference fileRef) {
+ return new LSMColumnBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, fileRef.getRelativePath(), storageManager, mergePolicyFactory,
+ mergePolicyProperties, btreeFields, opTrackerProvider, ioOpCallbackFactory, pageWriteCallbackFactory,
+ metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, compressorDecompressorFactory,
+ nullTypeTraits, nullIntrospector, isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d39f94e..d0e1e1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -40,8 +40,8 @@
private final IColumnBufferProvider[] primaryKeyBufferProviders;
private final IColumnBufferProvider[] buffersProviders;
private final int numberOfPrimaryKeys;
- private int totalNumberOfPages;
- private int numOfSkippedPages;
+ private int totalNumberOfMegaLeafNodes;
+ private int numOfSkippedMegaLeafNodes;
protected int tupleIndex;
/**
@@ -73,8 +73,8 @@
buffersProviders[i] = new ColumnSingleBufferProvider(columnIndex);
}
}
- totalNumberOfPages = 0;
- numOfSkippedPages = 0;
+ totalNumberOfMegaLeafNodes = 0;
+ numOfSkippedMegaLeafNodes = 0;
}
@Override
@@ -104,9 +104,9 @@
startColumn(provider, tupleIndex, i, numberOfTuples);
}
} else {
- numOfSkippedPages++;
+ numOfSkippedMegaLeafNodes++;
}
- totalNumberOfPages++;
+ totalNumberOfMegaLeafNodes++;
}
protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
@@ -149,8 +149,9 @@
@Override
public final void close() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Skipped {} pages out of {} in total", numOfSkippedPages, totalNumberOfPages);
+ if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) {
+ LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes,
+ totalNumberOfMegaLeafNodes);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index a7e433c..21c818c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -158,12 +158,12 @@
json.put("isSecondaryNoIncrementalMaintenance", isSecondaryNoIncrementalMaintenance);
}
- private static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean isPrimary) {
+ protected static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean isPrimary) {
// for backward compatibility, only primary indexes have bloom filters
return getOrDefaultBoolean(json, HAS_BLOOM_FILTER_FIELD, isPrimary);
}
- private static boolean getOrDefaultBoolean(JsonNode jsonNode, String fieldName, boolean defaultValue) {
+ protected static boolean getOrDefaultBoolean(JsonNode jsonNode, String fieldName, boolean defaultValue) {
return jsonNode.has(fieldName) ? jsonNode.get(fieldName).asBoolean() : defaultValue;
}