[ASTERIXDB-2194][COMP] Introduce datasource functions
- user model changes: yes
Some functions can be datasources
- storage format changes: no
- interface changes: yes
- Add IDatasourceFunction: A function that is also a datasource
- Add IFunctionToDataSourceTransformer: transform an unnest
function into a datascan during compilation
Details:
- Currently, functions are location agnostic and are run on
parameters that are either passed through them during compile
time or runtime.
- An exception to this is the dataset function which has
an associated location constraints running on the nodes
which host the dataset.
- In this change, we introduce a general framework that allows
creation of new functions similar to the dataset function.
- Such functions are called datasource Functions.
- A datasource function takes constant parameters and run on
a set of partitions similar to the dataset function.
- The first example of such functions is the DatasetResources
function.
- The DatasetResources function takes two parameters, a dataverse
and a dataset. It is then run on all nodes and returns a set
of dataset resources.
- Test cases are added for this function.
Change-Id: Ibcf807ac713a21e8f4d59868525467386e801303
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2216
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 24fe8e78..8dca64b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -84,14 +84,6 @@
return fieldAccessFunctions.contains(fid);
}
- public static boolean isDataSetCall(ILogicalExpression e) {
- if (((AbstractLogicalExpression) e).getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
- return false;
- }
- AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
- return BuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
- }
-
public static boolean isRunnableAccessToFieldRecord(ILogicalExpression expr) {
if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
@@ -129,17 +121,17 @@
public static Pair<String, String> getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
DataSourceId srcId = (DataSourceId) op.getDataSource().getId();
- return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasourceName());
+ return new Pair<>(srcId.getDataverseName(), srcId.getDatasourceName());
}
public static Pair<String, String> getExternalDatasetInfo(UnnestMapOperator op) throws AlgebricksException {
AbstractFunctionCallExpression unnestExpr = (AbstractFunctionCallExpression) op.getExpressionRef().getValue();
String dataverseName = AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(0));
String datasetName = AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(1));
- return new Pair<String, String>(dataverseName, datasetName);
+ return new Pair<>(dataverseName, datasetName);
}
- private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();
+ private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
static {
fieldAccessFunctions.add(BuiltinFunctions.GET_DATA);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index f0917af..13ff0ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -87,7 +87,8 @@
// https://issues.apache.org/jira/browse/ASTERIXDB-1618
if (dataSource.getDatasourceType() != DataSource.Type.EXTERNAL_DATASET
&& dataSource.getDatasourceType() != DataSource.Type.INTERNAL_DATASET
- && dataSource.getDatasourceType() != DataSource.Type.LOADABLE) {
+ && dataSource.getDatasourceType() != DataSource.Type.LOADABLE
+ && dataSource.getDatasourceType() != DataSource.Type.FUNCTION) {
IMutationDataSource mds = (IMutationDataSource) dataSource;
if (mds.isChange()) {
transformers = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 4550ba6..0d02385 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -18,52 +18,17 @@
*/
package org.apache.asterix.optimizer.rules;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.metadata.declared.DataSource;
-import org.apache.asterix.metadata.declared.DataSourceId;
-import org.apache.asterix.metadata.declared.FeedDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
-import org.apache.asterix.translator.util.PlanTranslationUtil;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
@@ -77,213 +42,24 @@
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
+ AbstractFunctionCallExpression f = getFunctionCall(opRef);
+ if (f == null) {
+ return false;
+ }
+ return BuiltinFunctions.getDatasourceTransformer(f.getFunctionIdentifier()).rewrite(opRef, context);
+ }
+
+ public static AbstractFunctionCallExpression getFunctionCall(Mutable<ILogicalOperator> opRef) {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
- return false;
+ return null;
}
UnnestOperator unnest = (UnnestOperator) op;
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
- return false;
- }
- return handleFunction(opRef, context, unnest, (AbstractFunctionCallExpression) unnestExpr);
- }
-
- protected boolean handleFunction(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
- UnnestOperator unnest, AbstractFunctionCallExpression f) throws AlgebricksException {
- FunctionIdentifier fid = f.getFunctionIdentifier();
- if (fid.equals(BuiltinFunctions.DATASET)) {
- if (unnest.getPositionalVariable() != null) {
- // TODO remove this after enabling the support of positional variables in data scan
- throw new AlgebricksException("No positional variables are allowed over datasets.");
- }
- ILogicalExpression expr = f.getArguments().get(0).getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
- return false;
- }
- ConstantExpression ce = (ConstantExpression) expr;
- IAlgebricksConstantValue acv = ce.getValue();
- if (!(acv instanceof AsterixConstantValue)) {
- return false;
- }
- AsterixConstantValue acv2 = (AsterixConstantValue) acv;
- if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
- return false;
- }
- String datasetArg = ((AString) acv2.getObject()).getStringValue();
- MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
- Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
- String dataverseName = datasetReference.first;
- String datasetName = datasetReference.second;
- Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
- }
- DataSourceId asid = new DataSourceId(dataverseName, datasetName);
- List<LogicalVariable> variables = new ArrayList<>();
- if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- int numPrimaryKeys = dataset.getPrimaryKeys().size();
- for (int i = 0; i < numPrimaryKeys; i++) {
- variables.add(context.newVar());
- }
- }
- variables.add(unnest.getVariable());
- DataSource dataSource = metadataProvider.findDataSource(asid);
- boolean hasMeta = dataSource.hasMeta();
- if (hasMeta) {
- variables.add(context.newVar());
- }
- DataSourceScanOperator scan = new DataSourceScanOperator(variables, dataSource);
- List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
- scanInpList.addAll(unnest.getInputs());
- opRef.setValue(scan);
- addPrimaryKey(variables, dataSource, context);
- context.computeAndSetTypeEnvironmentForOperator(scan);
- // Adds equivalence classes --- one equivalent class between a primary key
- // variable and a record field-access expression.
- IAType[] schemaTypes = dataSource.getSchemaTypes();
- ARecordType recordType =
- (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] : schemaTypes[schemaTypes.length - 1]);
- ARecordType metaRecordType = (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 1] : null);
- EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, variables, recordType,
- metaRecordType, dataset, context);
- return true;
- } else if (fid.equals(BuiltinFunctions.FEED_COLLECT)) {
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("No positional variables are allowed over feeds.");
- }
- String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
- String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
- String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
- String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
- String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
- String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
- MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
- DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
- String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
- FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName);
- if (policy == null) {
- policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
- if (policy == null) {
- throw new AlgebricksException("Unknown feed policy:" + policyName);
- }
- }
- ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
- String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
- List<LogicalVariable> pkVars = new ArrayList<>();
- FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
- metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
- // The order for feeds is <Record-Meta-PK>
- feedDataScanOutputVariables.add(unnest.getVariable());
- // Does it produce meta?
- if (ds.hasMeta()) {
- feedDataScanOutputVariables.add(context.newVar());
- }
- // Does it produce pk?
- if (ds.isChange()) {
- feedDataScanOutputVariables.addAll(pkVars);
- }
- DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
- List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
- scanInpList.addAll(unnest.getInputs());
- opRef.setValue(scan);
- context.computeAndSetTypeEnvironmentForOperator(scan);
- return true;
- }
- return false;
- }
-
- private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource dataSource,
- IOptimizationContext context) {
- List<LogicalVariable> primaryKey = dataSource.getPrimaryKeyVariables(scanVariables);
- List<LogicalVariable> tail = new ArrayList<>();
- tail.addAll(scanVariables);
- FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
- context.addPrimaryKey(pk);
- }
-
- private FeedDataSource createFeedDataSource(DataSourceId aqlId, String targetDataset, String sourceFeedName,
- String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
- String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
- List<LogicalVariable> pkVars) throws AlgebricksException {
- if (!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() == null ? null
- : metadataProvider.getDefaultDataverse().getDataverseName())) {
return null;
}
- Dataset dataset = metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
- ARecordType feedOutputType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), outputType);
- Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
- FeedConnection feedConnection =
- metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName, targetDataset);
- ARecordType metaType = null;
- // Does dataset have meta?
- if (dataset.hasMetaPart()) {
- String metaTypeName = FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
- if (metaTypeName == null) {
- throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
- }
- String dataverseName = aqlId.getDataverseName();
- if (metaTypeName.contains(".")) {
- dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
- metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
- }
- metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
- }
- // Is a change feed?
- List<IAType> pkTypes = null;
- List<List<String>> partitioningKeys = null;
- List<Integer> keySourceIndicator = null;
- List<Mutable<ILogicalExpression>> keyAccessExpression = null;
- List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
- if (ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
- keyAccessExpression = new ArrayList<>();
- keyAccessScalarFunctionCallExpression = new ArrayList<>();
- pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
- partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
- if (dataset.hasMetaPart()) {
- keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
- }
- for (int i = 0; i < partitioningKeys.size(); i++) {
- List<String> key = partitioningKeys.get(i);
- if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
- PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
- context);
- } else {
- PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
- null, context);
- }
- }
- keyAccessExpression.forEach(
- expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
- } else {
- keyAccessExpression = null;
- keyAccessScalarFunctionCallExpression = null;
- }
- FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
- aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
- sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
- context.getComputationNodeDomain(), feedConnection);
- feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
- return feedDataSource;
+ return (AbstractFunctionCallExpression) unnestExpr;
}
- private Pair<String, String> parseDatasetReference(MetadataProvider metadataProvider, String datasetArg)
- throws AlgebricksException {
- String[] datasetNameComponents = datasetArg.split("\\.");
- String dataverseName;
- String datasetName;
- if (datasetNameComponents.length == 1) {
- Dataverse defaultDataverse = metadataProvider.getDefaultDataverse();
- if (defaultDataverse == null) {
- throw new AlgebricksException("Unresolved dataset " + datasetArg + " Dataverse not specified.");
- }
- dataverseName = defaultDataverse.getDataverseName();
- datasetName = datasetNameComponents[0];
- } else {
- dataverseName = datasetNameComponents[0];
- datasetName = datasetNameComponents[1];
- }
- return new Pair<>(dataverseName, datasetName);
- }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
index 7633f4c..eaea208 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
@@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.metadata.declared.DataSource;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -261,7 +262,12 @@
Dataset dataset;
// case 1: dataset scan
if (scanOperator.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
- dataset = ((DatasetDataSource)((DataSourceScanOperator)scanOperator).getDataSource()).getDataset();
+ DataSourceScanOperator dss = (DataSourceScanOperator) scanOperator;
+ DataSource ds = (DataSource) dss.getDataSource();
+ if (ds.getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+ return null;
+ }
+ dataset = ((DatasetDataSource) ds).getDataset();
} else {
// case 2: dataset range search
AbstractFunctionCallExpression primaryIndexFunctionCall =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
new file mode 100644
index 0000000..e6b025a
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class DatasetResourcesDatasource extends FunctionDataSource {
+ private final int datasetId;
+
+ public DatasetResourcesDatasource(INodeDomain domain, int datasetId) throws AlgebricksException {
+ super(new DataSourceId(DatasetResourcesRewriter.DATASET_RESOURCES.getNamespace(),
+ DatasetResourcesRewriter.DATASET_RESOURCES.getName()), domain);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ return new DatasetResourcesFunction(locations, datasetId);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
new file mode 100644
index 0000000..05d192e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DatasetResourcesFunction extends AbstractDatasourceFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final int datasetId;
+
+ public DatasetResourcesFunction(AlgebricksAbsolutePartitionConstraint locations, int datasetId) {
+ super(locations);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) {
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+ DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
+ DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+ return new DatasetResourcesReader(dsr);
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
new file mode 100644
index 0000000..bce2002
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetResourcesReader extends FunctionReader {
+
+ private final List<String> components;
+ private final Iterator<String> it;
+ private final CharArrayRecord record;
+
+ public DatasetResourcesReader(DatasetResource dsr) {
+ components = new ArrayList<>();
+ if (dsr != null && dsr.isOpen()) {
+ Map<Long, IndexInfo> indexes = dsr.getIndexes();
+ for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+ IndexInfo value = entry.getValue();
+ ILSMIndex index = value.getIndex();
+ components.add(index.toString());
+ }
+ record = new CharArrayRecord();
+ } else {
+ record = null;
+ }
+ it = components.iterator();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return it.hasNext();
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ record.reset();
+ record.append(it.next().toCharArray());
+ record.endRecord();
+ return record;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
new file mode 100644
index 0000000..a575ba4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DatasetResourcesRewriter extends FunctionRewriter {
+
+ // Parameters are dataverse name, and dataset name
+ public static final FunctionIdentifier DATASET_RESOURCES =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset-resources", 2);
+ public static final DatasetResourcesRewriter INSTANCE = new DatasetResourcesRewriter(DATASET_RESOURCES);
+
+ private DatasetResourcesRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public DatasetResourcesDatasource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ String dataverseName = getString(f.getArguments(), 0);
+ String datasetName = getString(f.getArguments(), 1);
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ return new DatasetResourcesDatasource(context.getComputationNodeDomain(), dataset.getDatasetId());
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
new file mode 100644
index 0000000..c857ce0
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+
+public class DatasetRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
+ public static final DatasetRewriter INSTANCE = new DatasetRewriter();
+
+ private DatasetRewriter() {
+ }
+
+ @Override
+ public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ // TODO remove this after enabling the support of positional variables in data scan
+ throw new AlgebricksException("No positional variables are allowed over datasets.");
+ }
+ ILogicalExpression expr = f.getArguments().get(0).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+ ConstantExpression ce = (ConstantExpression) expr;
+ IAlgebricksConstantValue acv = ce.getValue();
+ if (!(acv instanceof AsterixConstantValue)) {
+ return false;
+ }
+ AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+ if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+ return false;
+ }
+ String datasetArg = ((AString) acv2.getObject()).getStringValue();
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
+ String dataverseName = datasetReference.first;
+ String datasetName = datasetReference.second;
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ DataSourceId asid = new DataSourceId(dataverseName, datasetName);
+ List<LogicalVariable> variables = new ArrayList<>();
+ if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+ int numPrimaryKeys = dataset.getPrimaryKeys().size();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ variables.add(context.newVar());
+ }
+ }
+ variables.add(unnest.getVariable());
+ DataSource dataSource = metadataProvider.findDataSource(asid);
+ boolean hasMeta = dataSource.hasMeta();
+ if (hasMeta) {
+ variables.add(context.newVar());
+ }
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables, dataSource);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ addPrimaryKey(variables, dataSource, context);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ // Adds equivalence classes --- one equivalent class between a primary key
+ // variable and a record field-access expression.
+ IAType[] schemaTypes = dataSource.getSchemaTypes();
+ ARecordType recordType =
+ (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] : schemaTypes[schemaTypes.length - 1]);
+ ARecordType metaRecordType = (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 1] : null);
+ EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, variables, recordType, metaRecordType,
+ dataset, context);
+ return true;
+ }
+
+ private Pair<String, String> parseDatasetReference(MetadataProvider metadataProvider, String datasetArg)
+ throws AlgebricksException {
+ String[] datasetNameComponents = datasetArg.split("\\.");
+ String dataverseName;
+ String datasetName;
+ if (datasetNameComponents.length == 1) {
+ Dataverse defaultDataverse = metadataProvider.getDefaultDataverse();
+ if (defaultDataverse == null) {
+ throw new AlgebricksException("Unresolved dataset " + datasetArg + " Dataverse not specified.");
+ }
+ dataverseName = defaultDataverse.getDataverseName();
+ datasetName = datasetNameComponents[0];
+ } else {
+ dataverseName = datasetNameComponents[0];
+ datasetName = datasetNameComponents[1];
+ }
+ return new Pair<>(dataverseName, datasetName);
+ }
+
+ private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource dataSource,
+ IOptimizationContext context) {
+ List<LogicalVariable> primaryKey = dataSource.getPrimaryKeyVariables(scanVariables);
+ List<LogicalVariable> tail = new ArrayList<>();
+ tail.addAll(scanVariables);
+ FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
+ context.addPrimaryKey(pk);
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+ if (f.getArguments().size() != 1) {
+ throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
+ }
+ ILogicalExpression a1 = f.getArguments().get(0).getValue();
+ IAType t1 = (IAType) env.getType(a1);
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return BuiltinType.ANY;
+ }
+ if (t1.getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
+ }
+ String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
+ if (datasetArg == null) {
+ return BuiltinType.ANY;
+ }
+ MetadataProvider metadata = (MetadataProvider) mp;
+ Pair<String, String> datasetInfo = DatasetUtil.getDatasetInfo(metadata, datasetArg);
+ String dataverseName = datasetInfo.first;
+ String datasetName = datasetInfo.second;
+ if (dataverseName == null) {
+ throw new AlgebricksException("Unspecified dataverse!");
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ String tn = dataset.getItemTypeName();
+ IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
+ if (t2 == null) {
+ throw new AlgebricksException("No type for dataset " + datasetName);
+ }
+ return t2;
+
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
new file mode 100644
index 0000000..ee3976e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.translator.util.PlanTranslationUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class FeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
+ public static final FeedRewriter INSTANCE = new FeedRewriter();
+
+ private FeedRewriter() {
+ }
+
+ @Override
+ public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ throw new AlgebricksException("No positional variables are allowed over feeds.");
+ }
+ String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
+ String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+ String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+ String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
+ String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+ String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
+ String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+ FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName);
+ if (policy == null) {
+ policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+ if (policy == null) {
+ throw new AlgebricksException("Unknown feed policy:" + policyName);
+ }
+ }
+ ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
+ String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+ List<LogicalVariable> pkVars = new ArrayList<>();
+ FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
+ metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
+ // The order for feeds is <Record-Meta-PK>
+ feedDataScanOutputVariables.add(unnest.getVariable());
+ // Does it produce meta?
+ if (ds.hasMeta()) {
+ feedDataScanOutputVariables.add(context.newVar());
+ }
+ // Does it produce pk?
+ if (ds.isChange()) {
+ feedDataScanOutputVariables.addAll(pkVars);
+ }
+ DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ return true;
+ }
+
+ private FeedDataSource createFeedDataSource(DataSourceId aqlId, String targetDataset, String sourceFeedName,
+ String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
+ String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
+ List<LogicalVariable> pkVars) throws AlgebricksException {
+ Dataset dataset = metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
+ ARecordType feedOutputType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), outputType);
+ Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
+ FeedConnection feedConnection =
+ metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName, targetDataset);
+ ARecordType metaType = null;
+ // Does dataset have meta?
+ if (dataset.hasMetaPart()) {
+ String metaTypeName = FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
+ if (metaTypeName == null) {
+ throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
+ }
+ String dataverseName = aqlId.getDataverseName();
+ if (metaTypeName.contains(".")) {
+ dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
+ metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+ }
+ metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
+ }
+ // Is a change feed?
+ List<IAType> pkTypes = null;
+ List<List<String>> partitioningKeys = null;
+ List<Integer> keySourceIndicator = null;
+
+ List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
+ if (ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
+ List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
+ keyAccessScalarFunctionCallExpression = new ArrayList<>();
+ pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
+ partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
+ if (dataset.hasMetaPart()) {
+ keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+ }
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ List<String> key = partitioningKeys.get(i);
+ if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
+ PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
+ context);
+ } else {
+ PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
+ null, context);
+ }
+ }
+ keyAccessExpression.forEach(
+ expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
+ } else {
+ keyAccessScalarFunctionCallExpression = null;
+ }
+ FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
+ aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
+ sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+ context.getComputationNodeDomain(), feedConnection);
+ feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
+ return feedDataSource;
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+ if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
+ throw new AlgebricksException("Incorrect number of arguments -> arity is "
+ + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
+ }
+ ILogicalExpression a1 = f.getArguments().get(5).getValue();
+ IAType t1 = (IAType) env.getType(a1);
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return BuiltinType.ANY;
+ }
+ if (t1.getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Illegal type " + t1 + " for feed-ingest argument.");
+ }
+ String typeArg = ConstantExpressionUtil.getStringConstant(a1);
+ if (typeArg == null) {
+ return BuiltinType.ANY;
+ }
+ MetadataProvider metadata = (MetadataProvider) mp;
+ Pair<String, String> argInfo = DatasetUtil.getDatasetInfo(metadata, typeArg);
+ String dataverseName = argInfo.first;
+ String typeName = argInfo.second;
+ if (dataverseName == null) {
+ throw new AlgebricksException("Unspecified dataverse!");
+ }
+ IAType t2 = metadata.findType(dataverseName, typeName);
+ if (t2 == null) {
+ throw new AlgebricksException("Unknown type " + typeName);
+ }
+ return t2;
+
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
new file mode 100644
index 0000000..c73f8e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class FunctionReader implements IRecordReader<char[]> {
+
+ @Override
+ public void close() throws IOException {
+ // No Op for function reader
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ // No Op for function reader
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
new file mode 100644
index 0000000..2ff9282
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public abstract class FunctionRewriter implements IFunctionToDataSourceRewriter {
+
+ private FunctionIdentifier functionId;
+
+ public FunctionRewriter(FunctionIdentifier functionId) {
+ this.functionId = functionId;
+ }
+
+ @Override
+ public final boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+ List<Mutable<ILogicalExpression>> args = f.getArguments();
+ if (args.size() != functionId.getArity()) {
+ throw new AlgebricksException("Function " + functionId.getNamespace() + "." + functionId.getName()
+ + " expects " + functionId.getArity() + " arguments");
+ }
+ for (int i = 0; i < args.size(); i++) {
+ if (args.get(i).getValue().getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ throw new AlgebricksException("Function " + functionId.getNamespace() + "." + functionId.getName()
+ + " expects constant arguments while arg[" + i + "] is of type "
+ + args.get(i).getValue().getExpressionTag());
+ }
+ }
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ throw new AlgebricksException("No positional variables are allowed over datasource functions");
+ }
+ FunctionDataSource datasource = toDatasource(context, f);
+ List<LogicalVariable> variables = new ArrayList<>();
+ variables.add(unnest.getVariable());
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables, datasource);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ return true;
+ }
+
+ protected abstract FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException;
+
+ protected String getString(List<Mutable<ILogicalExpression>> args, int i) throws AlgebricksException {
+ ConstantExpression ce = (ConstantExpression) args.get(i).getValue();
+ IAlgebricksConstantValue acv = ce.getValue();
+ if (!(acv instanceof AsterixConstantValue)) {
+ throw new AlgebricksException("Expected arg[" + i + "] to be of type String");
+ }
+ AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+ if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Expected arg[" + i + "] to be of type String");
+ }
+ return ((AString) acv2.getObject()).getStringValue();
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
new file mode 100644
index 0000000..7245b88
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class StorageComponentsDatasource extends FunctionDataSource {
+ private final int datasetId;
+
+ public StorageComponentsDatasource(INodeDomain domain, int datasetId) throws AlgebricksException {
+ super(new DataSourceId(StorageComponentsRewriter.STORAGE_COMPONENTS.getNamespace(),
+ StorageComponentsRewriter.STORAGE_COMPONENTS.getName()), domain);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ return new StorageComponentsFunction(locations, datasetId);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
new file mode 100644
index 0000000..73b2d0e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StorageComponentsFunction extends AbstractDatasourceFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final int datasetId;
+
+ public StorageComponentsFunction(AlgebricksAbsolutePartitionConstraint locations, int datasetId) {
+ super(locations);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+ DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
+ DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+ return new StorageComponentsReader(dsr);
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
new file mode 100644
index 0000000..4958d14
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class StorageComponentsReader extends FunctionReader {
+
+ private final List<String> components;
+ private final Iterator<String> it;
+ private final CharArrayRecord record;
+
+ public StorageComponentsReader(DatasetResource dsr) throws HyracksDataException {
+ components = new ArrayList<>();
+ if (dsr != null && dsr.isOpen()) {
+ Map<Long, IndexInfo> indexes = dsr.getIndexes();
+ StringBuilder strBuilder = new StringBuilder();
+ for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+ strBuilder.setLength(0);
+ IndexInfo value = entry.getValue();
+ ILSMIndex index = value.getIndex();
+ String path = value.getLocalResource().getPath();
+ strBuilder.append('{');
+ strBuilder.append("\"path\":\"");
+ strBuilder.append(path);
+ strBuilder.append("\", \"components\":[");
+ // syncronize over the opTracker
+ synchronized (index.getOperationTracker()) {
+ List<ILSMDiskComponent> diskComponents = index.getDiskComponents();
+ for (int i = diskComponents.size() - 1; i >= 0; i--) {
+ if (i < diskComponents.size() - 1) {
+ strBuilder.append(',');
+ }
+ ILSMDiskComponent c = diskComponents.get(i);
+ LSMComponentId id = (LSMComponentId) c.getId();
+ strBuilder.append('{');
+ strBuilder.append("\"min\":");
+ strBuilder.append(id.getMinId());
+ strBuilder.append(",\"max\":");
+ strBuilder.append(id.getMaxId());
+ strBuilder.append('}');
+ }
+ }
+ strBuilder.append("]}");
+ components.add(strBuilder.toString());
+ }
+ record = new CharArrayRecord();
+ } else {
+ record = null;
+ }
+ it = components.iterator();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return it.hasNext();
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ record.reset();
+ record.append(it.next().toCharArray());
+ record.endRecord();
+ return record;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
new file mode 100644
index 0000000..89bd115
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class StorageComponentsRewriter extends FunctionRewriter {
+
+ // Parameters are dataverse name, and dataset name
+ public static final FunctionIdentifier STORAGE_COMPONENTS =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "storage-components", 2);
+ public static final StorageComponentsRewriter INSTANCE = new StorageComponentsRewriter(STORAGE_COMPONENTS);
+
+ private StorageComponentsRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public StorageComponentsDatasource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ String dataverseName = getString(f.getArguments(), 0);
+ String datasetName = getString(f.getArguments(), 1);
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+ }
+ return new StorageComponentsDatasource(context.getComputationNodeDomain(), dataset.getDatasetId());
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 670b2bd..622e28f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -82,6 +82,7 @@
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -137,6 +138,7 @@
String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
hcc = new HyracksConnection(strIP, port);
+ MetadataBuiltinFunctions.init();
ILibraryManager libraryManager = new ExternalLibraryManager();
ReplicationProperties repProp = new ReplicationProperties(
PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a05b2bb..e1a75cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -45,6 +45,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.CompatibilityUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -104,6 +105,7 @@
System.setProperty("java.rmi.server.hostname",
(controllerService).getConfiguration().getClusterPublicAddress());
}
+ MetadataBuiltinFunctions.init();
runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions());
MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
new file mode 100644
index 0000000..c143a63
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.util;
+
+import org.apache.asterix.app.function.DatasetResourcesRewriter;
+import org.apache.asterix.app.function.DatasetRewriter;
+import org.apache.asterix.app.function.FeedRewriter;
+import org.apache.asterix.app.function.StorageComponentsRewriter;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.utils.RecordUtil;
+
+public class MetadataBuiltinFunctions {
+
+ static {
+ // Dataset function
+ BuiltinFunctions.addFunction(BuiltinFunctions.DATASET, DatasetRewriter.INSTANCE, true);
+ BuiltinFunctions.addUnnestFun(BuiltinFunctions.DATASET, false);
+ BuiltinFunctions.addDatasourceFunction(BuiltinFunctions.DATASET, DatasetRewriter.INSTANCE);
+ // Feed collect function
+ BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, FeedRewriter.INSTANCE, true);
+ BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_COLLECT, false);
+ BuiltinFunctions.addDatasourceFunction(BuiltinFunctions.FEED_COLLECT, FeedRewriter.INSTANCE);
+ // Dataset resources function
+ BuiltinFunctions.addPrivateFunction(DatasetResourcesRewriter.DATASET_RESOURCES,
+ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+ BuiltinFunctions.addUnnestFun(DatasetResourcesRewriter.DATASET_RESOURCES, false);
+ BuiltinFunctions.addDatasourceFunction(DatasetResourcesRewriter.DATASET_RESOURCES,
+ DatasetResourcesRewriter.INSTANCE);
+ // Dataset components function
+ BuiltinFunctions.addPrivateFunction(StorageComponentsRewriter.STORAGE_COMPONENTS,
+ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+ BuiltinFunctions.addUnnestFun(StorageComponentsRewriter.STORAGE_COMPONENTS, false);
+ BuiltinFunctions.addDatasourceFunction(StorageComponentsRewriter.STORAGE_COMPONENTS,
+ StorageComponentsRewriter.INSTANCE);
+
+ }
+
+ private MetadataBuiltinFunctions() {
+ }
+
+ public static void init() {
+ // Only execute the static block
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 9dddda4..ac249b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -54,7 +54,6 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedConstants;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.file.StorageComponentProvider;
@@ -87,6 +86,7 @@
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.feeds.LocationConstraint;
+import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -202,7 +202,7 @@
addArgs(feedConnection.getDataverseName(), feedConnection.getFeedId().getEntityName(),
feedConnection.getFeedId().getEntityName(), FeedRuntimeType.INTAKE.toString(),
feedConnection.getDatasetName(), feedConnection.getOutputType());
- CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(FeedConstants.FEED_COLLECT_FUN), exprList);
+ CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
FromClause fromClause = new FromClause(Arrays.asList(fromterm));
// TODO: This can be the place to add select predicate for ingestion
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp
new file mode 100644
index 0000000..ca38c20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create index idx_partkey on LineItem (l_partkey) type btree;
+
+create primary index sec_primary_idx on LineItem;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp
new file mode 100644
index 0000000..546a831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp
new file mode 100644
index 0000000..53af94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+set `import-private-functions` `true`;
+select value count(*) from dataset_resources('Metadata','Dataset') dsr;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp
new file mode 100644
index 0000000..33b4b33
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+set `import-private-functions` `true`;
+select value (( select value count(*) from dataset_resources('test','LineItem') resource )[0] > 2);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp
new file mode 100644
index 0000000..b396dd4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set `import-private-functions` `true`;
+select value (( select value count(*) from storage_components('test','LineItem') resource )[0] > 2);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm
@@ -0,0 +1 @@
+1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm
new file mode 100644
index 0000000..27ba77d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm
@@ -0,0 +1 @@
+true
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm
new file mode 100644
index 0000000..27ba77d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm
@@ -0,0 +1 @@
+true
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c6493d..9ce7eb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3445,6 +3445,11 @@
</test-group>
<test-group name="misc">
<test-case FilePath="misc">
+ <compilation-unit name="dataset-resources">
+ <output-dir compare="Text">dataset-resources</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="misc">
<compilation-unit name="case_01">
<output-dir compare="Text">case_01</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 1bb9c11..fc59f68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.adapter.factory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -36,6 +37,7 @@
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
import org.apache.asterix.external.provider.DataflowControllerProvider;
import org.apache.asterix.external.provider.DatasourceFactoryProvider;
import org.apache.asterix.external.provider.ParserFactoryProvider;
@@ -45,6 +47,7 @@
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -203,4 +206,20 @@
public IExternalDataSourceFactory getDataSourceFactory() {
return dataSourceFactory;
}
+
+ /**
+ * Use pre-configured datasource factory
+ * For function datasources
+ *
+ * @param dataSourceFactory
+ * the function datasource factory
+ * @throws AlgebricksException
+ */
+ public void configure(IExternalDataSourceFactory dataSourceFactory) throws AlgebricksException {
+ this.dataSourceFactory = dataSourceFactory;
+ dataParserFactory = new ADMDataParserFactory();
+ dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+ dataParserFactory.configure(Collections.emptyMap());
+ configuration = Collections.emptyMap();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index e2fa6db..a29d66c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -18,13 +18,8 @@
*/
package org.apache.asterix.external.util;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-
public class FeedConstants {
- public static final FunctionIdentifier FEED_COLLECT_FUN = BuiltinFunctions.FEED_COLLECT;
-
public final static String FEEDS_METADATA_DV = "feeds_metadata";
public final static String FAILED_TUPLE_DATASET = "failed_tuple";
public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java
new file mode 100644
index 0000000..336c2dc
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.api;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A function that is also a datasource
+ */
+public interface IDatasourceFunction extends Serializable {
+
+ /**
+ * @return the locations on which the function is to be run
+ */
+ AlgebricksAbsolutePartitionConstraint getPartitionConstraint();
+
+ /**
+ * The function record reader
+ *
+ * @param ctx
+ * @param partition
+ * @return
+ * @throws HyracksDataException
+ */
+ IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
index d796fed..c4f5bcb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
@@ -22,8 +22,6 @@
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataUtil;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -33,10 +31,8 @@
public static final Dataverse DEFAULT_DATAVERSE =
new Dataverse(DEFAULT_DATAVERSE_NAME, NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
//--------------------------------------- Datatypes -----------------------------------------//
- public static final ARecordType ANY_OBJECT_RECORD_TYPE =
- new ARecordType("AnyObject", new String[0], new IAType[0], true);
public static final Datatype ANY_OBJECT_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
- ANY_OBJECT_RECORD_TYPE.getTypeName(), RecordUtil.FULLY_OPEN_RECORD_TYPE, false);
+ RecordUtil.FULLY_OPEN_RECORD_TYPE.getTypeName(), RecordUtil.FULLY_OPEN_RECORD_TYPE, false);
private MetadataBuiltinEntities() {
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java
new file mode 100644
index 0000000..0aff4c3
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
+public abstract class AbstractDatasourceFunction implements IDatasourceFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final transient AlgebricksAbsolutePartitionConstraint locations;
+
+ public AbstractDatasourceFunction(AlgebricksAbsolutePartitionConstraint locations) {
+ this.locations = locations;
+ }
+
+ @Override
+ public final AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return locations;
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
index fa874d5..ca22567 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
@@ -57,6 +57,7 @@
public static final byte EXTERNAL_DATASET = 0x01;
public static final byte FEED = 0x02;
public static final byte LOADABLE = 0x03;
+ public static final byte FUNCTION = 0x04;
// Hide implicit public constructor
private Type() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
new file mode 100644
index 0000000..d2b9871
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public abstract class FunctionDataSource extends DataSource {
+
+ public FunctionDataSource(DataSourceId id, INodeDomain domain) throws AlgebricksException {
+ super(id, RecordUtil.FULLY_OPEN_RECORD_TYPE, null, DataSource.Type.FUNCTION, domain);
+ schemaTypes = new IAType[] { itemType };
+ }
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ return true;
+ }
+
+ @Override
+ public IDataSourcePropertiesProvider getPropertiesProvider() {
+ // Unordered Random partitioning on all nodes
+ return scanVariables -> new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+ Collections.emptyList());
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
+ MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+ IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
+ throws AlgebricksException {
+ GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+ adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+ IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
+ FunctionDataSourceFactory factory =
+ new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
+ adapterFactory.configure(factory);
+ return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+ }
+
+ protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations);
+
+ protected static AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+ String[] allPartitions = csm.getClusterLocations().getLocations();
+ Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
+ return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
new file mode 100644
index 0000000..1e0ddbf
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FunctionDataSourceFactory implements IRecordReaderFactory<char[]> {
+
+ private static final long serialVersionUID = 1L;
+ private final IDatasourceFunction function;
+
+ public FunctionDataSourceFactory(IDatasourceFunction function) {
+ this.function = function;
+ }
+
+ @Override
+ public final DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+ return function.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration)
+ throws AlgebricksException, HyracksDataException {
+ // No Op
+ }
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ return function.createRecordReader(ctx, partition);
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return Collections.emptyList();
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
deleted file mode 100644
index 137e625..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.functions;
-
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-
-public class MetadataBuiltinFunctions {
-
- static {
- addMetadataBuiltinFunctions();
- BuiltinFunctions.addUnnestFun(BuiltinFunctions.DATASET, false);
- BuiltinFunctions.addDatasetFunction(BuiltinFunctions.DATASET);
- BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_COLLECT, false);
- BuiltinFunctions.addDatasetFunction(BuiltinFunctions.FEED_COLLECT);
- BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_INTERCEPT, false);
- BuiltinFunctions.addDatasetFunction(BuiltinFunctions.FEED_INTERCEPT);
- }
-
- public static void addMetadataBuiltinFunctions() {
-
- BuiltinFunctions.addFunction(BuiltinFunctions.DATASET, new IResultTypeComputer() {
-
- @Override
- public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
- IMetadataProvider<?, ?> mp) throws AlgebricksException {
- AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
- if (f.getArguments().size() != 1) {
- throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
- }
- ILogicalExpression a1 = f.getArguments().get(0).getValue();
- IAType t1 = (IAType) env.getType(a1);
- if (t1.getTypeTag() == ATypeTag.ANY) {
- return BuiltinType.ANY;
- }
- if (t1.getTypeTag() != ATypeTag.STRING) {
- throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
- }
- String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
- if (datasetArg == null) {
- return BuiltinType.ANY;
- }
- MetadataProvider metadata = (MetadataProvider) mp;
- Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
- String dataverseName = datasetInfo.first;
- String datasetName = datasetInfo.second;
- if (dataverseName == null) {
- throw new AlgebricksException("Unspecified dataverse!");
- }
- Dataset dataset = metadata.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
- }
- String tn = dataset.getItemTypeName();
- IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
- if (t2 == null) {
- throw new AlgebricksException("No type for dataset " + datasetName);
- }
- return t2;
- }
- }, true);
-
- BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, new IResultTypeComputer() {
-
- @Override
- public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
- IMetadataProvider<?, ?> mp) throws AlgebricksException {
- AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
- if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
- throw new AlgebricksException("Incorrect number of arguments -> arity is "
- + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
- }
- ILogicalExpression a1 = f.getArguments().get(5).getValue();
- IAType t1 = (IAType) env.getType(a1);
- if (t1.getTypeTag() == ATypeTag.ANY) {
- return BuiltinType.ANY;
- }
- if (t1.getTypeTag() != ATypeTag.STRING) {
- throw new AlgebricksException("Illegal type " + t1 + " for feed-ingest argument.");
- }
- String typeArg = ConstantExpressionUtil.getStringConstant(a1);
- if (typeArg == null) {
- return BuiltinType.ANY;
- }
- MetadataProvider metadata = (MetadataProvider) mp;
- Pair<String, String> argInfo = getDatasetInfo(metadata, typeArg);
- String dataverseName = argInfo.first;
- String typeName = argInfo.second;
- if (dataverseName == null) {
- throw new AlgebricksException("Unspecified dataverse!");
- }
- IAType t2 = metadata.findType(dataverseName, typeName);
- if (t2 == null) {
- throw new AlgebricksException("Unknown type " + typeName);
- }
- return t2;
- }
- }, true);
-
- BuiltinFunctions.addFunction(BuiltinFunctions.FEED_INTERCEPT, new IResultTypeComputer() {
-
- @Override
- public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
- IMetadataProvider<?, ?> mp) throws AlgebricksException {
- AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
- if (f.getArguments().size() != 1) {
- throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
- }
- ILogicalExpression a1 = f.getArguments().get(0).getValue();
- IAType t1 = (IAType) env.getType(a1);
- if (t1.getTypeTag() == ATypeTag.ANY) {
- return BuiltinType.ANY;
- }
- if (t1.getTypeTag() != ATypeTag.STRING) {
- throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
- }
- String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
- if (datasetArg == null) {
- return BuiltinType.ANY;
- }
- MetadataProvider metadata = (MetadataProvider) mp;
- Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
- String dataverseName = datasetInfo.first;
- String datasetName = datasetInfo.second;
- if (dataverseName == null) {
- throw new AlgebricksException("Unspecified dataverse!");
- }
- Dataset dataset = metadata.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
- }
- String tn = dataset.getItemTypeName();
- IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
- if (t2 == null) {
- throw new AlgebricksException("No type for dataset " + datasetName);
- }
- return t2;
- }
- }, true);
- }
-
- private static Pair<String, String> getDatasetInfo(MetadataProvider metadata, String datasetArg) {
- String[] nameComponents = datasetArg.split("\\.");
- String first;
- String second;
- if (nameComponents.length == 1) {
- first = metadata.getDefaultDataverse() == null ? null : metadata.getDefaultDataverse().getDataverseName();
- second = nameComponents[0];
- } else {
- first = nameComponents[0];
- second = nameComponents[1];
- }
- return new Pair<String, String>(first, second);
- }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index cf663eb..e38894c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -547,4 +547,19 @@
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodeGroup, new ArrayList<>(ncNames)));
return nodeGroup;
}
+
+ // This doesn't work if the dataset or the dataverse name contains a '.'
+ public static Pair<String, String> getDatasetInfo(MetadataProvider metadata, String datasetArg) {
+ String[] nameComponents = datasetArg.split("\\.");
+ String first;
+ String second;
+ if (nameComponents.length == 1) {
+ first = metadata.getDefaultDataverse() == null ? null : metadata.getDefaultDataverse().getDataverseName();
+ second = nameComponents[0];
+ } else {
+ first = nameComponents[0];
+ second = nameComponents[1];
+ }
+ return new Pair<>(first, second);
+ }
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index e1f5bb0..dde61f9 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -134,7 +134,7 @@
private static final Map<IFunctionInfo, IResultTypeComputer> funTypeComputer = new HashMap<>();
private static final Set<IFunctionInfo> builtinAggregateFunctions = new HashSet<>();
- private static final Set<IFunctionInfo> datasetFunctions = new HashSet<>();
+ private static final Map<IFunctionInfo, IFunctionToDataSourceRewriter> datasourceFunctions = new HashMap<>();
private static final Set<IFunctionInfo> similarityFunctions = new HashSet<>();
private static final Set<IFunctionInfo> globalAggregateFunctions = new HashSet<>();
private static final Map<IFunctionInfo, IFunctionInfo> aggregateToLocalAggregate = new HashMap<>();
@@ -1280,13 +1280,6 @@
// unnesting function
addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
- String metadataFunctionLoaderClassName = "org.apache.asterix.metadata.functions.MetadataBuiltinFunctions";
- try {
- Class.forName(metadataFunctionLoaderClassName);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
}
static {
@@ -1527,27 +1520,17 @@
}
static {
- datasetFunctions.add(getAsterixFunctionInfo(DATASET));
- datasetFunctions.add(getAsterixFunctionInfo(FEED_COLLECT));
- datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
- datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
- }
-
- static {
- addUnnestFun(DATASET, false);
- addUnnestFun(FEED_COLLECT, false);
- addUnnestFun(FEED_INTERCEPT, false);
addUnnestFun(RANGE, true);
addUnnestFun(SCAN_COLLECTION, false);
addUnnestFun(SUBSET_COLLECTION, false);
}
- public static void addDatasetFunction(FunctionIdentifier fi) {
- datasetFunctions.add(getAsterixFunctionInfo(fi));
+ public static void addDatasourceFunction(FunctionIdentifier fi, IFunctionToDataSourceRewriter transformer) {
+ datasourceFunctions.put(getAsterixFunctionInfo(fi), transformer);
}
- public static boolean isDatasetFunction(FunctionIdentifier fi) {
- return datasetFunctions.contains(getAsterixFunctionInfo(fi));
+ public static IFunctionToDataSourceRewriter getDatasourceTransformer(FunctionIdentifier fi) {
+ return datasourceFunctions.getOrDefault(getAsterixFunctionInfo(fi), IFunctionToDataSourceRewriter.NOOP);
}
public static boolean isBuiltinCompilerFunction(FunctionSignature signature, boolean includePrivateFunctions) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java
new file mode 100644
index 0000000..2793c4d
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.functions;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+
+public interface IFunctionToDataSourceRewriter {
+ public static final IFunctionToDataSourceRewriter NOOP = (o, c) -> false;
+
+ /**
+ * Replace the unnest operator by a datasource operator
+ *
+ * @param opRef
+ * UnnestOperator to be replaced by DataSourceScanOperator
+ * @param context
+ * optimization context
+ * @return true if transformed, false otherwise
+ * @throws AlgebricksException
+ */
+ boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
index 031669a..e4a057f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
@@ -29,7 +29,7 @@
* A fully open record type which has the name OpenRecord
*/
public static final ARecordType FULLY_OPEN_RECORD_TYPE =
- new ARecordType("OpenRecord", new String[0], new IAType[0], true);
+ new ARecordType("AnyObject", new String[0], new IAType[0], true);
private RecordUtil() {
}
@@ -73,6 +73,7 @@
*/
public static int computeNullBitmapSize(ARecordType recordType) {
return NonTaggedFormatUtil.hasOptionalField(recordType)
- ? (int) Math.ceil(recordType.getFieldNames().length / 4.0) : 0;
+ ? (int) Math.ceil(recordType.getFieldNames().length / 4.0)
+ : 0;
}
}