[ASTERIXDB-2194][COMP] Introduce datasource functions

- user model changes: yes
  Some functions can be datasources
- storage format changes: no
- interface changes: yes
  - Add IDatasourceFunction: A function that is also a datasource
  - Add IFunctionToDataSourceTransformer: transform an unnest
    function into a datascan during compilation

Details:
- Currently, functions are location agnostic and are run on
  parameters that are either passed through them during compile
  time or runtime.
- An exception to this is the dataset function which has
  an associated location constraints running on the nodes
  which host the dataset.
- In this change, we introduce a general framework that allows
  creation of new functions similar to the dataset function.
- Such functions are called datasource Functions.
- A datasource function takes constant parameters and run on
  a set of partitions similar to the dataset function.
- The first example of such functions is the DatasetResources
  function.
- The DatasetResources function takes two parameters, a dataverse
  and a dataset. It is then run on all nodes and returns a set
  of dataset resources.
- Test cases are added for this function.

Change-Id: Ibcf807ac713a21e8f4d59868525467386e801303
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2216
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 24fe8e78..8dca64b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -84,14 +84,6 @@
         return fieldAccessFunctions.contains(fid);
     }
 
-    public static boolean isDataSetCall(ILogicalExpression e) {
-        if (((AbstractLogicalExpression) e).getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
-            return false;
-        }
-        AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
-        return BuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
-    }
-
     public static boolean isRunnableAccessToFieldRecord(ILogicalExpression expr) {
         if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
             AbstractFunctionCallExpression fc = (AbstractFunctionCallExpression) expr;
@@ -129,17 +121,17 @@
 
     public static Pair<String, String> getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
         DataSourceId srcId = (DataSourceId) op.getDataSource().getId();
-        return new Pair<String, String>(srcId.getDataverseName(), srcId.getDatasourceName());
+        return new Pair<>(srcId.getDataverseName(), srcId.getDatasourceName());
     }
 
     public static Pair<String, String> getExternalDatasetInfo(UnnestMapOperator op) throws AlgebricksException {
         AbstractFunctionCallExpression unnestExpr = (AbstractFunctionCallExpression) op.getExpressionRef().getValue();
         String dataverseName = AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(0));
         String datasetName = AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(1));
-        return new Pair<String, String>(dataverseName, datasetName);
+        return new Pair<>(dataverseName, datasetName);
     }
 
-    private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<FunctionIdentifier>();
+    private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>();
 
     static {
         fieldAccessFunctions.add(BuiltinFunctions.GET_DATA);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index f0917af..13ff0ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -87,7 +87,8 @@
             // https://issues.apache.org/jira/browse/ASTERIXDB-1618
             if (dataSource.getDatasourceType() != DataSource.Type.EXTERNAL_DATASET
                     && dataSource.getDatasourceType() != DataSource.Type.INTERNAL_DATASET
-                    && dataSource.getDatasourceType() != DataSource.Type.LOADABLE) {
+                    && dataSource.getDatasourceType() != DataSource.Type.LOADABLE
+                    && dataSource.getDatasourceType() != DataSource.Type.FUNCTION) {
                 IMutationDataSource mds = (IMutationDataSource) dataSource;
                 if (mds.isChange()) {
                     transformers = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 4550ba6..0d02385 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -18,52 +18,17 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.metadata.declared.DataSource;
-import org.apache.asterix.metadata.declared.DataSourceId;
-import org.apache.asterix.metadata.declared.FeedDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
-import org.apache.asterix.translator.util.PlanTranslationUtil;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
@@ -77,213 +42,24 @@
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
+        AbstractFunctionCallExpression f = getFunctionCall(opRef);
+        if (f == null) {
+            return false;
+        }
+        return BuiltinFunctions.getDatasourceTransformer(f.getFunctionIdentifier()).rewrite(opRef, context);
+    }
+
+    public static AbstractFunctionCallExpression getFunctionCall(Mutable<ILogicalOperator> opRef) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
-            return false;
+            return null;
         }
         UnnestOperator unnest = (UnnestOperator) op;
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
-            return false;
-        }
-        return handleFunction(opRef, context, unnest, (AbstractFunctionCallExpression) unnestExpr);
-    }
-
-    protected boolean handleFunction(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
-            UnnestOperator unnest, AbstractFunctionCallExpression f) throws AlgebricksException {
-        FunctionIdentifier fid = f.getFunctionIdentifier();
-        if (fid.equals(BuiltinFunctions.DATASET)) {
-            if (unnest.getPositionalVariable() != null) {
-                // TODO remove this after enabling the support of positional variables in data scan
-                throw new AlgebricksException("No positional variables are allowed over datasets.");
-            }
-            ILogicalExpression expr = f.getArguments().get(0).getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
-                return false;
-            }
-            ConstantExpression ce = (ConstantExpression) expr;
-            IAlgebricksConstantValue acv = ce.getValue();
-            if (!(acv instanceof AsterixConstantValue)) {
-                return false;
-            }
-            AsterixConstantValue acv2 = (AsterixConstantValue) acv;
-            if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
-                return false;
-            }
-            String datasetArg = ((AString) acv2.getObject()).getStringValue();
-            MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
-            Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
-            String dataverseName = datasetReference.first;
-            String datasetName = datasetReference.second;
-            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-            if (dataset == null) {
-                throw new AlgebricksException(
-                        "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
-            }
-            DataSourceId asid = new DataSourceId(dataverseName, datasetName);
-            List<LogicalVariable> variables = new ArrayList<>();
-            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                int numPrimaryKeys = dataset.getPrimaryKeys().size();
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    variables.add(context.newVar());
-                }
-            }
-            variables.add(unnest.getVariable());
-            DataSource dataSource = metadataProvider.findDataSource(asid);
-            boolean hasMeta = dataSource.hasMeta();
-            if (hasMeta) {
-                variables.add(context.newVar());
-            }
-            DataSourceScanOperator scan = new DataSourceScanOperator(variables, dataSource);
-            List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
-            scanInpList.addAll(unnest.getInputs());
-            opRef.setValue(scan);
-            addPrimaryKey(variables, dataSource, context);
-            context.computeAndSetTypeEnvironmentForOperator(scan);
-            // Adds equivalence classes --- one equivalent class between a primary key
-            // variable and a record field-access expression.
-            IAType[] schemaTypes = dataSource.getSchemaTypes();
-            ARecordType recordType =
-                    (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] : schemaTypes[schemaTypes.length - 1]);
-            ARecordType metaRecordType = (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 1] : null);
-            EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, variables, recordType,
-                    metaRecordType, dataset, context);
-            return true;
-        } else if (fid.equals(BuiltinFunctions.FEED_COLLECT)) {
-            if (unnest.getPositionalVariable() != null) {
-                throw new AlgebricksException("No positional variables are allowed over feeds.");
-            }
-            String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
-            String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
-            String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
-            String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
-            String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
-            String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
-            MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
-            DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
-            String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
-            FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName);
-            if (policy == null) {
-                policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
-                if (policy == null) {
-                    throw new AlgebricksException("Unknown feed policy:" + policyName);
-                }
-            }
-            ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
-            String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
-            List<LogicalVariable> pkVars = new ArrayList<>();
-            FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
-                    metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
-            // The order for feeds is <Record-Meta-PK>
-            feedDataScanOutputVariables.add(unnest.getVariable());
-            // Does it produce meta?
-            if (ds.hasMeta()) {
-                feedDataScanOutputVariables.add(context.newVar());
-            }
-            // Does it produce pk?
-            if (ds.isChange()) {
-                feedDataScanOutputVariables.addAll(pkVars);
-            }
-            DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
-            List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
-            scanInpList.addAll(unnest.getInputs());
-            opRef.setValue(scan);
-            context.computeAndSetTypeEnvironmentForOperator(scan);
-            return true;
-        }
-        return false;
-    }
-
-    private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource dataSource,
-            IOptimizationContext context) {
-        List<LogicalVariable> primaryKey = dataSource.getPrimaryKeyVariables(scanVariables);
-        List<LogicalVariable> tail = new ArrayList<>();
-        tail.addAll(scanVariables);
-        FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
-        context.addPrimaryKey(pk);
-    }
-
-    private FeedDataSource createFeedDataSource(DataSourceId aqlId, String targetDataset, String sourceFeedName,
-            String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
-            String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
-            List<LogicalVariable> pkVars) throws AlgebricksException {
-        if (!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() == null ? null
-                : metadataProvider.getDefaultDataverse().getDataverseName())) {
             return null;
         }
-        Dataset dataset = metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
-        ARecordType feedOutputType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), outputType);
-        Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
-        FeedConnection feedConnection =
-                metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName, targetDataset);
-        ARecordType metaType = null;
-        // Does dataset have meta?
-        if (dataset.hasMetaPart()) {
-            String metaTypeName = FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
-            if (metaTypeName == null) {
-                throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
-            }
-            String dataverseName = aqlId.getDataverseName();
-            if (metaTypeName.contains(".")) {
-                dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
-                metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
-            }
-            metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
-        }
-        // Is a change feed?
-        List<IAType> pkTypes = null;
-        List<List<String>> partitioningKeys = null;
-        List<Integer> keySourceIndicator = null;
-        List<Mutable<ILogicalExpression>> keyAccessExpression = null;
-        List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
-        if (ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
-            keyAccessExpression = new ArrayList<>();
-            keyAccessScalarFunctionCallExpression = new ArrayList<>();
-            pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
-            partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
-            if (dataset.hasMetaPart()) {
-                keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
-            }
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                List<String> key = partitioningKeys.get(i);
-                if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
-                    PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
-                            context);
-                } else {
-                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
-                            null, context);
-                }
-            }
-            keyAccessExpression.forEach(
-                    expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
-        } else {
-            keyAccessExpression = null;
-            keyAccessScalarFunctionCallExpression = null;
-        }
-        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
-                aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
-                sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain(), feedConnection);
-        feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
-        return feedDataSource;
+        return (AbstractFunctionCallExpression) unnestExpr;
     }
 
-    private Pair<String, String> parseDatasetReference(MetadataProvider metadataProvider, String datasetArg)
-            throws AlgebricksException {
-        String[] datasetNameComponents = datasetArg.split("\\.");
-        String dataverseName;
-        String datasetName;
-        if (datasetNameComponents.length == 1) {
-            Dataverse defaultDataverse = metadataProvider.getDefaultDataverse();
-            if (defaultDataverse == null) {
-                throw new AlgebricksException("Unresolved dataset " + datasetArg + " Dataverse not specified.");
-            }
-            dataverseName = defaultDataverse.getDataverseName();
-            datasetName = datasetNameComponents[0];
-        } else {
-            dataverseName = datasetNameComponents[0];
-            datasetName = datasetNameComponents[1];
-        }
-        return new Pair<>(dataverseName, datasetName);
-    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
index 7633f4c..eaea208 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -261,7 +262,12 @@
         Dataset dataset;
         // case 1: dataset scan
         if (scanOperator.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
-            dataset = ((DatasetDataSource)((DataSourceScanOperator)scanOperator).getDataSource()).getDataset();
+            DataSourceScanOperator dss = (DataSourceScanOperator) scanOperator;
+            DataSource ds = (DataSource) dss.getDataSource();
+            if (ds.getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+                return null;
+            }
+            dataset = ((DatasetDataSource) ds).getDataset();
         } else {
             // case 2: dataset range search
             AbstractFunctionCallExpression primaryIndexFunctionCall =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
new file mode 100644
index 0000000..e6b025a
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class DatasetResourcesDatasource extends FunctionDataSource {
+    private final int datasetId;
+
+    public DatasetResourcesDatasource(INodeDomain domain, int datasetId) throws AlgebricksException {
+        super(new DataSourceId(DatasetResourcesRewriter.DATASET_RESOURCES.getNamespace(),
+                DatasetResourcesRewriter.DATASET_RESOURCES.getName()), domain);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new DatasetResourcesFunction(locations, datasetId);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
new file mode 100644
index 0000000..05d192e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DatasetResourcesFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final int datasetId;
+
+    public DatasetResourcesFunction(AlgebricksAbsolutePartitionConstraint locations, int datasetId) {
+        super(locations);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) {
+        INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+        INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+        DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
+        DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+        return new DatasetResourcesReader(dsr);
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
new file mode 100644
index 0000000..bce2002
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetResourcesReader extends FunctionReader {
+
+    private final List<String> components;
+    private final Iterator<String> it;
+    private final CharArrayRecord record;
+
+    public DatasetResourcesReader(DatasetResource dsr) {
+        components = new ArrayList<>();
+        if (dsr != null && dsr.isOpen()) {
+            Map<Long, IndexInfo> indexes = dsr.getIndexes();
+            for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+                IndexInfo value = entry.getValue();
+                ILSMIndex index = value.getIndex();
+                components.add(index.toString());
+            }
+            record = new CharArrayRecord();
+        } else {
+            record = null;
+        }
+        it = components.iterator();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return it.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        record.reset();
+        record.append(it.next().toCharArray());
+        record.endRecord();
+        return record;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
new file mode 100644
index 0000000..a575ba4
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DatasetResourcesRewriter extends FunctionRewriter {
+
+    // Parameters are dataverse name, and dataset name
+    public static final FunctionIdentifier DATASET_RESOURCES =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset-resources", 2);
+    public static final DatasetResourcesRewriter INSTANCE = new DatasetResourcesRewriter(DATASET_RESOURCES);
+
+    private DatasetResourcesRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    public DatasetResourcesDatasource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        String dataverseName = getString(f.getArguments(), 0);
+        String datasetName = getString(f.getArguments(), 1);
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        return new DatasetResourcesDatasource(context.getComputationNodeDomain(), dataset.getDatasetId());
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
new file mode 100644
index 0000000..c857ce0
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+
+public class DatasetRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
+    public static final DatasetRewriter INSTANCE = new DatasetRewriter();
+
+    private DatasetRewriter() {
+    }
+
+    @Override
+    public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            // TODO remove this after enabling the support of positional variables in data scan
+            throw new AlgebricksException("No positional variables are allowed over datasets.");
+        }
+        ILogicalExpression expr = f.getArguments().get(0).getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            return false;
+        }
+        ConstantExpression ce = (ConstantExpression) expr;
+        IAlgebricksConstantValue acv = ce.getValue();
+        if (!(acv instanceof AsterixConstantValue)) {
+            return false;
+        }
+        AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+        if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+            return false;
+        }
+        String datasetArg = ((AString) acv2.getObject()).getStringValue();
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
+        String dataverseName = datasetReference.first;
+        String datasetName = datasetReference.second;
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        DataSourceId asid = new DataSourceId(dataverseName, datasetName);
+        List<LogicalVariable> variables = new ArrayList<>();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            int numPrimaryKeys = dataset.getPrimaryKeys().size();
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                variables.add(context.newVar());
+            }
+        }
+        variables.add(unnest.getVariable());
+        DataSource dataSource = metadataProvider.findDataSource(asid);
+        boolean hasMeta = dataSource.hasMeta();
+        if (hasMeta) {
+            variables.add(context.newVar());
+        }
+        DataSourceScanOperator scan = new DataSourceScanOperator(variables, dataSource);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        addPrimaryKey(variables, dataSource, context);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        // Adds equivalence classes --- one equivalent class between a primary key
+        // variable and a record field-access expression.
+        IAType[] schemaTypes = dataSource.getSchemaTypes();
+        ARecordType recordType =
+                (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] : schemaTypes[schemaTypes.length - 1]);
+        ARecordType metaRecordType = (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 1] : null);
+        EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, variables, recordType, metaRecordType,
+                dataset, context);
+        return true;
+    }
+
+    private Pair<String, String> parseDatasetReference(MetadataProvider metadataProvider, String datasetArg)
+            throws AlgebricksException {
+        String[] datasetNameComponents = datasetArg.split("\\.");
+        String dataverseName;
+        String datasetName;
+        if (datasetNameComponents.length == 1) {
+            Dataverse defaultDataverse = metadataProvider.getDefaultDataverse();
+            if (defaultDataverse == null) {
+                throw new AlgebricksException("Unresolved dataset " + datasetArg + " Dataverse not specified.");
+            }
+            dataverseName = defaultDataverse.getDataverseName();
+            datasetName = datasetNameComponents[0];
+        } else {
+            dataverseName = datasetNameComponents[0];
+            datasetName = datasetNameComponents[1];
+        }
+        return new Pair<>(dataverseName, datasetName);
+    }
+
+    private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource dataSource,
+            IOptimizationContext context) {
+        List<LogicalVariable> primaryKey = dataSource.getPrimaryKeyVariables(scanVariables);
+        List<LogicalVariable> tail = new ArrayList<>();
+        tail.addAll(scanVariables);
+        FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
+        context.addPrimaryKey(pk);
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        if (f.getArguments().size() != 1) {
+            throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
+        }
+        ILogicalExpression a1 = f.getArguments().get(0).getValue();
+        IAType t1 = (IAType) env.getType(a1);
+        if (t1.getTypeTag() == ATypeTag.ANY) {
+            return BuiltinType.ANY;
+        }
+        if (t1.getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
+        }
+        String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
+        if (datasetArg == null) {
+            return BuiltinType.ANY;
+        }
+        MetadataProvider metadata = (MetadataProvider) mp;
+        Pair<String, String> datasetInfo = DatasetUtil.getDatasetInfo(metadata, datasetArg);
+        String dataverseName = datasetInfo.first;
+        String datasetName = datasetInfo.second;
+        if (dataverseName == null) {
+            throw new AlgebricksException("Unspecified dataverse!");
+        }
+        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        String tn = dataset.getItemTypeName();
+        IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
+        if (t2 == null) {
+            throw new AlgebricksException("No type for dataset " + datasetName);
+        }
+        return t2;
+
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
new file mode 100644
index 0000000..ee3976e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.translator.util.PlanTranslationUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class FeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
+    public static final FeedRewriter INSTANCE = new FeedRewriter();
+
+    private FeedRewriter() {
+    }
+
+    @Override
+    public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            throw new AlgebricksException("No positional variables are allowed over feeds.");
+        }
+        String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
+        String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+        String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+        String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
+        String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+        String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
+        String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+        FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName);
+        if (policy == null) {
+            policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+            if (policy == null) {
+                throw new AlgebricksException("Unknown feed policy:" + policyName);
+            }
+        }
+        ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
+        String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+        List<LogicalVariable> pkVars = new ArrayList<>();
+        FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
+                metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
+        // The order for feeds is <Record-Meta-PK>
+        feedDataScanOutputVariables.add(unnest.getVariable());
+        // Does it produce meta?
+        if (ds.hasMeta()) {
+            feedDataScanOutputVariables.add(context.newVar());
+        }
+        // Does it produce pk?
+        if (ds.isChange()) {
+            feedDataScanOutputVariables.addAll(pkVars);
+        }
+        DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        return true;
+    }
+
+    private FeedDataSource createFeedDataSource(DataSourceId aqlId, String targetDataset, String sourceFeedName,
+            String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
+            String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
+            List<LogicalVariable> pkVars) throws AlgebricksException {
+        Dataset dataset = metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
+        ARecordType feedOutputType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), outputType);
+        Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), sourceFeedName);
+        FeedConnection feedConnection =
+                metadataProvider.findFeedConnection(aqlId.getDataverseName(), sourceFeedName, targetDataset);
+        ARecordType metaType = null;
+        // Does dataset have meta?
+        if (dataset.hasMetaPart()) {
+            String metaTypeName = FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
+            if (metaTypeName == null) {
+                throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
+            }
+            String dataverseName = aqlId.getDataverseName();
+            if (metaTypeName.contains(".")) {
+                dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
+                metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+            }
+            metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
+        }
+        // Is a change feed?
+        List<IAType> pkTypes = null;
+        List<List<String>> partitioningKeys = null;
+        List<Integer> keySourceIndicator = null;
+
+        List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
+        if (ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
+            List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
+            keyAccessScalarFunctionCallExpression = new ArrayList<>();
+            pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
+            partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
+            if (dataset.hasMetaPart()) {
+                keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                List<String> key = partitioningKeys.get(i);
+                if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
+                    PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
+                            context);
+                } else {
+                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
+                            null, context);
+                }
+            }
+            keyAccessExpression.forEach(
+                    expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
+        } else {
+            keyAccessScalarFunctionCallExpression = null;
+        }
+        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed,
+                aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression,
+                sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
+        feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
+        return feedDataSource;
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+        if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
+            throw new AlgebricksException("Incorrect number of arguments -> arity is "
+                    + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
+        }
+        ILogicalExpression a1 = f.getArguments().get(5).getValue();
+        IAType t1 = (IAType) env.getType(a1);
+        if (t1.getTypeTag() == ATypeTag.ANY) {
+            return BuiltinType.ANY;
+        }
+        if (t1.getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Illegal type " + t1 + " for feed-ingest argument.");
+        }
+        String typeArg = ConstantExpressionUtil.getStringConstant(a1);
+        if (typeArg == null) {
+            return BuiltinType.ANY;
+        }
+        MetadataProvider metadata = (MetadataProvider) mp;
+        Pair<String, String> argInfo = DatasetUtil.getDatasetInfo(metadata, typeArg);
+        String dataverseName = argInfo.first;
+        String typeName = argInfo.second;
+        if (dataverseName == null) {
+            throw new AlgebricksException("Unspecified dataverse!");
+        }
+        IAType t2 = metadata.findType(dataverseName, typeName);
+        if (t2 == null) {
+            throw new AlgebricksException("Unknown type  " + typeName);
+        }
+        return t2;
+
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
new file mode 100644
index 0000000..c73f8e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class FunctionReader implements IRecordReader<char[]> {
+
+    @Override
+    public void close() throws IOException {
+        // No Op for function reader
+    }
+
+    @Override
+    public boolean stop() {
+        return true;
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        // No Op for function reader
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
new file mode 100644
index 0000000..2ff9282
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public abstract class FunctionRewriter implements IFunctionToDataSourceRewriter {
+
+    private FunctionIdentifier functionId;
+
+    public FunctionRewriter(FunctionIdentifier functionId) {
+        this.functionId = functionId;
+    }
+
+    @Override
+    public final boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
+        List<Mutable<ILogicalExpression>> args = f.getArguments();
+        if (args.size() != functionId.getArity()) {
+            throw new AlgebricksException("Function " + functionId.getNamespace() + "." + functionId.getName()
+                    + " expects " + functionId.getArity() + " arguments");
+        }
+        for (int i = 0; i < args.size(); i++) {
+            if (args.get(i).getValue().getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                throw new AlgebricksException("Function " + functionId.getNamespace() + "." + functionId.getName()
+                        + " expects constant arguments while arg[" + i + "] is of type "
+                        + args.get(i).getValue().getExpressionTag());
+            }
+        }
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            throw new AlgebricksException("No positional variables are allowed over datasource functions");
+        }
+        FunctionDataSource datasource = toDatasource(context, f);
+        List<LogicalVariable> variables = new ArrayList<>();
+        variables.add(unnest.getVariable());
+        DataSourceScanOperator scan = new DataSourceScanOperator(variables, datasource);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        return true;
+    }
+
+    protected abstract FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+            throws AlgebricksException;
+
+    protected String getString(List<Mutable<ILogicalExpression>> args, int i) throws AlgebricksException {
+        ConstantExpression ce = (ConstantExpression) args.get(i).getValue();
+        IAlgebricksConstantValue acv = ce.getValue();
+        if (!(acv instanceof AsterixConstantValue)) {
+            throw new AlgebricksException("Expected arg[" + i + "] to be of type String");
+        }
+        AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+        if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Expected arg[" + i + "] to be of type String");
+        }
+        return ((AString) acv2.getObject()).getStringValue();
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
new file mode 100644
index 0000000..7245b88
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class StorageComponentsDatasource extends FunctionDataSource {
+    private final int datasetId;
+
+    public StorageComponentsDatasource(INodeDomain domain, int datasetId) throws AlgebricksException {
+        super(new DataSourceId(StorageComponentsRewriter.STORAGE_COMPONENTS.getNamespace(),
+                StorageComponentsRewriter.STORAGE_COMPONENTS.getName()), domain);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new StorageComponentsFunction(locations, datasetId);
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
new file mode 100644
index 0000000..73b2d0e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StorageComponentsFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final int datasetId;
+
+    public StorageComponentsFunction(AlgebricksAbsolutePartitionConstraint locations, int datasetId) {
+        super(locations);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+        INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+        DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
+        DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+        return new StorageComponentsReader(dsr);
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
new file mode 100644
index 0000000..4958d14
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class StorageComponentsReader extends FunctionReader {
+
+    private final List<String> components;
+    private final Iterator<String> it;
+    private final CharArrayRecord record;
+
+    public StorageComponentsReader(DatasetResource dsr) throws HyracksDataException {
+        components = new ArrayList<>();
+        if (dsr != null && dsr.isOpen()) {
+            Map<Long, IndexInfo> indexes = dsr.getIndexes();
+            StringBuilder strBuilder = new StringBuilder();
+            for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+                strBuilder.setLength(0);
+                IndexInfo value = entry.getValue();
+                ILSMIndex index = value.getIndex();
+                String path = value.getLocalResource().getPath();
+                strBuilder.append('{');
+                strBuilder.append("\"path\":\"");
+                strBuilder.append(path);
+                strBuilder.append("\", \"components\":[");
+                // syncronize over the opTracker
+                synchronized (index.getOperationTracker()) {
+                    List<ILSMDiskComponent> diskComponents = index.getDiskComponents();
+                    for (int i = diskComponents.size() - 1; i >= 0; i--) {
+                        if (i < diskComponents.size() - 1) {
+                            strBuilder.append(',');
+                        }
+                        ILSMDiskComponent c = diskComponents.get(i);
+                        LSMComponentId id = (LSMComponentId) c.getId();
+                        strBuilder.append('{');
+                        strBuilder.append("\"min\":");
+                        strBuilder.append(id.getMinId());
+                        strBuilder.append(",\"max\":");
+                        strBuilder.append(id.getMaxId());
+                        strBuilder.append('}');
+                    }
+                }
+                strBuilder.append("]}");
+                components.add(strBuilder.toString());
+            }
+            record = new CharArrayRecord();
+        } else {
+            record = null;
+        }
+        it = components.iterator();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return it.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        record.reset();
+        record.append(it.next().toCharArray());
+        record.endRecord();
+        return record;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
new file mode 100644
index 0000000..89bd115
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class StorageComponentsRewriter extends FunctionRewriter {
+
+    // Parameters are dataverse name, and dataset name
+    public static final FunctionIdentifier STORAGE_COMPONENTS =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "storage-components", 2);
+    public static final StorageComponentsRewriter INSTANCE = new StorageComponentsRewriter(STORAGE_COMPONENTS);
+
+    private StorageComponentsRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    public StorageComponentsDatasource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        String dataverseName = getString(f.getArguments(), 0);
+        String datasetName = getString(f.getArguments(), 1);
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
+        }
+        return new StorageComponentsDatasource(context.getComputationNodeDomain(), dataset.getDatasetId());
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 670b2bd..622e28f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -82,6 +82,7 @@
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -137,6 +138,7 @@
         String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
         int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
+        MetadataBuiltinFunctions.init();
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ReplicationProperties repProp = new ReplicationProperties(
                 PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a05b2bb..e1a75cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.asterix.utils.CompatibilityUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -104,6 +105,7 @@
             System.setProperty("java.rmi.server.hostname",
                     (controllerService).getConfiguration().getClusterPublicAddress());
         }
+        MetadataBuiltinFunctions.init();
         runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions());
         MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
new file mode 100644
index 0000000..c143a63
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.util;
+
+import org.apache.asterix.app.function.DatasetResourcesRewriter;
+import org.apache.asterix.app.function.DatasetRewriter;
+import org.apache.asterix.app.function.FeedRewriter;
+import org.apache.asterix.app.function.StorageComponentsRewriter;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.utils.RecordUtil;
+
+public class MetadataBuiltinFunctions {
+
+    static {
+        // Dataset function
+        BuiltinFunctions.addFunction(BuiltinFunctions.DATASET, DatasetRewriter.INSTANCE, true);
+        BuiltinFunctions.addUnnestFun(BuiltinFunctions.DATASET, false);
+        BuiltinFunctions.addDatasourceFunction(BuiltinFunctions.DATASET, DatasetRewriter.INSTANCE);
+        // Feed collect function
+        BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, FeedRewriter.INSTANCE, true);
+        BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_COLLECT, false);
+        BuiltinFunctions.addDatasourceFunction(BuiltinFunctions.FEED_COLLECT, FeedRewriter.INSTANCE);
+        // Dataset resources function
+        BuiltinFunctions.addPrivateFunction(DatasetResourcesRewriter.DATASET_RESOURCES,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+        BuiltinFunctions.addUnnestFun(DatasetResourcesRewriter.DATASET_RESOURCES, false);
+        BuiltinFunctions.addDatasourceFunction(DatasetResourcesRewriter.DATASET_RESOURCES,
+                DatasetResourcesRewriter.INSTANCE);
+        // Dataset components function
+        BuiltinFunctions.addPrivateFunction(StorageComponentsRewriter.STORAGE_COMPONENTS,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+        BuiltinFunctions.addUnnestFun(StorageComponentsRewriter.STORAGE_COMPONENTS, false);
+        BuiltinFunctions.addDatasourceFunction(StorageComponentsRewriter.STORAGE_COMPONENTS,
+                StorageComponentsRewriter.INSTANCE);
+
+    }
+
+    private MetadataBuiltinFunctions() {
+    }
+
+    public static void init() {
+        // Only execute the static block
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 9dddda4..ac249b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -54,7 +54,6 @@
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedConstants;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
@@ -87,6 +86,7 @@
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.feeds.LocationConstraint;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -202,7 +202,7 @@
                 addArgs(feedConnection.getDataverseName(), feedConnection.getFeedId().getEntityName(),
                         feedConnection.getFeedId().getEntityName(), FeedRuntimeType.INTAKE.toString(),
                         feedConnection.getDatasetName(), feedConnection.getOutputType());
-        CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(FeedConstants.FEED_COLLECT_FUN), exprList);
+        CallExpr datasrouceCallFunction = new CallExpr(new FunctionSignature(BuiltinFunctions.FEED_COLLECT), exprList);
         FromTerm fromterm = new FromTerm(datasrouceCallFunction, fromTermLeftExpr, null, null);
         FromClause fromClause = new FromClause(Arrays.asList(fromterm));
         // TODO: This can be the place to add select predicate for ingestion
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp
new file mode 100644
index 0000000..ca38c20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.1.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+
+create type test.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+};
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
+create  index idx_partkey  on LineItem (l_partkey) type btree;
+
+create  primary index sec_primary_idx  on LineItem;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp
new file mode 100644
index 0000000..546a831
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+
+load  dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp
new file mode 100644
index 0000000..53af94f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.3.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+set `import-private-functions` `true`;
+select value count(*) from dataset_resources('Metadata','Dataset') dsr;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp
new file mode 100644
index 0000000..33b4b33
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.4.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+set `import-private-functions` `true`;
+select value (( select value count(*) from dataset_resources('test','LineItem') resource )[0] > 2);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp
new file mode 100644
index 0000000..b396dd4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/dataset-resources/dataset-resources.5.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+set `import-private-functions` `true`;
+select value (( select value count(*) from storage_components('test','LineItem') resource )[0] > 2);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.3.adm
@@ -0,0 +1 @@
+1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm
new file mode 100644
index 0000000..27ba77d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.4.adm
@@ -0,0 +1 @@
+true
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm
new file mode 100644
index 0000000..27ba77d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.5.adm
@@ -0,0 +1 @@
+true
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0c6493d..9ce7eb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -3445,6 +3445,11 @@
   </test-group>
   <test-group name="misc">
     <test-case FilePath="misc">
+      <compilation-unit name="dataset-resources">
+        <output-dir compare="Text">dataset-resources</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="misc">
       <compilation-unit name="case_01">
         <output-dir compare="Text">case_01</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 1bb9c11..fc59f68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.adapter.factory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -36,6 +37,7 @@
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
 import org.apache.asterix.external.provider.DataflowControllerProvider;
 import org.apache.asterix.external.provider.DatasourceFactoryProvider;
 import org.apache.asterix.external.provider.ParserFactoryProvider;
@@ -45,6 +47,7 @@
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -203,4 +206,20 @@
     public IExternalDataSourceFactory getDataSourceFactory() {
         return dataSourceFactory;
     }
+
+    /**
+     * Use pre-configured datasource factory
+     * For function datasources
+     *
+     * @param dataSourceFactory
+     *            the function datasource factory
+     * @throws AlgebricksException
+     */
+    public void configure(IExternalDataSourceFactory dataSourceFactory) throws AlgebricksException {
+        this.dataSourceFactory = dataSourceFactory;
+        dataParserFactory = new ADMDataParserFactory();
+        dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+        dataParserFactory.configure(Collections.emptyMap());
+        configuration = Collections.emptyMap();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
index e2fa6db..a29d66c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedConstants.java
@@ -18,13 +18,8 @@
  */
 package org.apache.asterix.external.util;
 
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-
 public class FeedConstants {
 
-    public static final FunctionIdentifier FEED_COLLECT_FUN = BuiltinFunctions.FEED_COLLECT;
-
     public final static String FEEDS_METADATA_DV = "feeds_metadata";
     public final static String FAILED_TUPLE_DATASET = "failed_tuple";
     public final static String FAILED_TUPLE_DATASET_TYPE = "FailedTupleType";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java
new file mode 100644
index 0000000..336c2dc
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IDatasourceFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.api;
+
+import java.io.Serializable;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A function that is also a datasource
+ */
+public interface IDatasourceFunction extends Serializable {
+
+    /**
+     * @return the locations on which the function is to be run
+     */
+    AlgebricksAbsolutePartitionConstraint getPartitionConstraint();
+
+    /**
+     * The function record reader
+     *
+     * @param ctx
+     * @param partition
+     * @return
+     * @throws HyracksDataException
+     */
+    IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
index d796fed..c4f5bcb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBuiltinEntities.java
@@ -22,8 +22,6 @@
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataUtil;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
@@ -33,10 +31,8 @@
     public static final Dataverse DEFAULT_DATAVERSE =
             new Dataverse(DEFAULT_DATAVERSE_NAME, NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP);
     //--------------------------------------- Datatypes -----------------------------------------//
-    public static final ARecordType ANY_OBJECT_RECORD_TYPE =
-            new ARecordType("AnyObject", new String[0], new IAType[0], true);
     public static final Datatype ANY_OBJECT_DATATYPE = new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
-            ANY_OBJECT_RECORD_TYPE.getTypeName(), RecordUtil.FULLY_OPEN_RECORD_TYPE, false);
+            RecordUtil.FULLY_OPEN_RECORD_TYPE.getTypeName(), RecordUtil.FULLY_OPEN_RECORD_TYPE, false);
 
     private MetadataBuiltinEntities() {
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java
new file mode 100644
index 0000000..0aff4c3
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AbstractDatasourceFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
+public abstract class AbstractDatasourceFunction implements IDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final transient AlgebricksAbsolutePartitionConstraint locations;
+
+    public AbstractDatasourceFunction(AlgebricksAbsolutePartitionConstraint locations) {
+        this.locations = locations;
+    }
+
+    @Override
+    public final AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return locations;
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
index fa874d5..ca22567 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSource.java
@@ -57,6 +57,7 @@
         public static final byte EXTERNAL_DATASET = 0x01;
         public static final byte FEED = 0x02;
         public static final byte LOADABLE = 0x03;
+        public static final byte FUNCTION = 0x04;
 
         // Hide implicit public constructor
         private Type() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
new file mode 100644
index 0000000..d2b9871
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public abstract class FunctionDataSource extends DataSource {
+
+    public FunctionDataSource(DataSourceId id, INodeDomain domain) throws AlgebricksException {
+        super(id, RecordUtil.FULLY_OPEN_RECORD_TYPE, null, DataSource.Type.FUNCTION, domain);
+        schemaTypes = new IAType[] { itemType };
+    }
+
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        return true;
+    }
+
+    @Override
+    public IDataSourcePropertiesProvider getPropertiesProvider() {
+        // Unordered Random partitioning on all nodes
+        return scanVariables -> new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+                Collections.emptyList());
+    }
+
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceScanRuntime(
+            MetadataProvider metadataProvider, IDataSource<DataSourceId> dataSource,
+            List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+            List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
+            IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
+            throws AlgebricksException {
+        GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+        adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
+        IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
+        FunctionDataSourceFactory factory =
+                new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
+        adapterFactory.configure(factory);
+        return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
+    }
+
+    protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations);
+
+    protected static AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+        String[] allPartitions = csm.getClusterLocations().getLocations();
+        Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
+        return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
new file mode 100644
index 0000000..1e0ddbf
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSourceFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FunctionDataSourceFactory implements IRecordReaderFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+    private final IDatasourceFunction function;
+
+    public FunctionDataSourceFactory(IDatasourceFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public final DataSourceType getDataSourceType() {
+        return DataSourceType.RECORDS;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+        return function.getPartitionConstraint();
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration)
+            throws AlgebricksException, HyracksDataException {
+        // No Op
+    }
+
+    @Override
+    public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+            throws HyracksDataException {
+        return function.createRecordReader(ctx, partition);
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return Collections.emptyList();
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
deleted file mode 100644
index 137e625..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.functions;
-
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-
-public class MetadataBuiltinFunctions {
-
-    static {
-        addMetadataBuiltinFunctions();
-        BuiltinFunctions.addUnnestFun(BuiltinFunctions.DATASET, false);
-        BuiltinFunctions.addDatasetFunction(BuiltinFunctions.DATASET);
-        BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_COLLECT, false);
-        BuiltinFunctions.addDatasetFunction(BuiltinFunctions.FEED_COLLECT);
-        BuiltinFunctions.addUnnestFun(BuiltinFunctions.FEED_INTERCEPT, false);
-        BuiltinFunctions.addDatasetFunction(BuiltinFunctions.FEED_INTERCEPT);
-    }
-
-    public static void addMetadataBuiltinFunctions() {
-
-        BuiltinFunctions.addFunction(BuiltinFunctions.DATASET, new IResultTypeComputer() {
-
-            @Override
-            public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
-                    IMetadataProvider<?, ?> mp) throws AlgebricksException {
-                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
-                if (f.getArguments().size() != 1) {
-                    throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
-                }
-                ILogicalExpression a1 = f.getArguments().get(0).getValue();
-                IAType t1 = (IAType) env.getType(a1);
-                if (t1.getTypeTag() == ATypeTag.ANY) {
-                    return BuiltinType.ANY;
-                }
-                if (t1.getTypeTag() != ATypeTag.STRING) {
-                    throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
-                }
-                String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
-                if (datasetArg == null) {
-                    return BuiltinType.ANY;
-                }
-                MetadataProvider metadata = (MetadataProvider) mp;
-                Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
-                String dataverseName = datasetInfo.first;
-                String datasetName = datasetInfo.second;
-                if (dataverseName == null) {
-                    throw new AlgebricksException("Unspecified dataverse!");
-                }
-                Dataset dataset = metadata.findDataset(dataverseName, datasetName);
-                if (dataset == null) {
-                    throw new AlgebricksException(
-                            "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
-                }
-                String tn = dataset.getItemTypeName();
-                IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
-                if (t2 == null) {
-                    throw new AlgebricksException("No type for dataset " + datasetName);
-                }
-                return t2;
-            }
-        }, true);
-
-        BuiltinFunctions.addPrivateFunction(BuiltinFunctions.FEED_COLLECT, new IResultTypeComputer() {
-
-            @Override
-            public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
-                    IMetadataProvider<?, ?> mp) throws AlgebricksException {
-                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
-                if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
-                    throw new AlgebricksException("Incorrect number of arguments -> arity is "
-                            + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
-                }
-                ILogicalExpression a1 = f.getArguments().get(5).getValue();
-                IAType t1 = (IAType) env.getType(a1);
-                if (t1.getTypeTag() == ATypeTag.ANY) {
-                    return BuiltinType.ANY;
-                }
-                if (t1.getTypeTag() != ATypeTag.STRING) {
-                    throw new AlgebricksException("Illegal type " + t1 + " for feed-ingest argument.");
-                }
-                String typeArg = ConstantExpressionUtil.getStringConstant(a1);
-                if (typeArg == null) {
-                    return BuiltinType.ANY;
-                }
-                MetadataProvider metadata = (MetadataProvider) mp;
-                Pair<String, String> argInfo = getDatasetInfo(metadata, typeArg);
-                String dataverseName = argInfo.first;
-                String typeName = argInfo.second;
-                if (dataverseName == null) {
-                    throw new AlgebricksException("Unspecified dataverse!");
-                }
-                IAType t2 = metadata.findType(dataverseName, typeName);
-                if (t2 == null) {
-                    throw new AlgebricksException("Unknown type  " + typeName);
-                }
-                return t2;
-            }
-        }, true);
-
-        BuiltinFunctions.addFunction(BuiltinFunctions.FEED_INTERCEPT, new IResultTypeComputer() {
-
-            @Override
-            public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
-                    IMetadataProvider<?, ?> mp) throws AlgebricksException {
-                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
-                if (f.getArguments().size() != 1) {
-                    throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
-                }
-                ILogicalExpression a1 = f.getArguments().get(0).getValue();
-                IAType t1 = (IAType) env.getType(a1);
-                if (t1.getTypeTag() == ATypeTag.ANY) {
-                    return BuiltinType.ANY;
-                }
-                if (t1.getTypeTag() != ATypeTag.STRING) {
-                    throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
-                }
-                String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
-                if (datasetArg == null) {
-                    return BuiltinType.ANY;
-                }
-                MetadataProvider metadata = (MetadataProvider) mp;
-                Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
-                String dataverseName = datasetInfo.first;
-                String datasetName = datasetInfo.second;
-                if (dataverseName == null) {
-                    throw new AlgebricksException("Unspecified dataverse!");
-                }
-                Dataset dataset = metadata.findDataset(dataverseName, datasetName);
-                if (dataset == null) {
-                    throw new AlgebricksException(
-                            "Could not find dataset " + datasetName + " in dataverse " + dataverseName);
-                }
-                String tn = dataset.getItemTypeName();
-                IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
-                if (t2 == null) {
-                    throw new AlgebricksException("No type for dataset " + datasetName);
-                }
-                return t2;
-            }
-        }, true);
-    }
-
-    private static Pair<String, String> getDatasetInfo(MetadataProvider metadata, String datasetArg) {
-        String[] nameComponents = datasetArg.split("\\.");
-        String first;
-        String second;
-        if (nameComponents.length == 1) {
-            first = metadata.getDefaultDataverse() == null ? null : metadata.getDefaultDataverse().getDataverseName();
-            second = nameComponents[0];
-        } else {
-            first = nameComponents[0];
-            second = nameComponents[1];
-        }
-        return new Pair<String, String>(first, second);
-    }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index cf663eb..e38894c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -547,4 +547,19 @@
         MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodeGroup, new ArrayList<>(ncNames)));
         return nodeGroup;
     }
+
+    // This doesn't work if the dataset  or the dataverse name contains a '.'
+    public static Pair<String, String> getDatasetInfo(MetadataProvider metadata, String datasetArg) {
+        String[] nameComponents = datasetArg.split("\\.");
+        String first;
+        String second;
+        if (nameComponents.length == 1) {
+            first = metadata.getDefaultDataverse() == null ? null : metadata.getDefaultDataverse().getDataverseName();
+            second = nameComponents[0];
+        } else {
+            first = nameComponents[0];
+            second = nameComponents[1];
+        }
+        return new Pair<>(first, second);
+    }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index e1f5bb0..dde61f9 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -134,7 +134,7 @@
     private static final Map<IFunctionInfo, IResultTypeComputer> funTypeComputer = new HashMap<>();
 
     private static final Set<IFunctionInfo> builtinAggregateFunctions = new HashSet<>();
-    private static final Set<IFunctionInfo> datasetFunctions = new HashSet<>();
+    private static final Map<IFunctionInfo, IFunctionToDataSourceRewriter> datasourceFunctions = new HashMap<>();
     private static final Set<IFunctionInfo> similarityFunctions = new HashSet<>();
     private static final Set<IFunctionInfo> globalAggregateFunctions = new HashSet<>();
     private static final Map<IFunctionInfo, IFunctionInfo> aggregateToLocalAggregate = new HashMap<>();
@@ -1280,13 +1280,6 @@
         // unnesting function
         addPrivateFunction(SCAN_COLLECTION, CollectionMemberResultType.INSTANCE, true);
 
-        String metadataFunctionLoaderClassName = "org.apache.asterix.metadata.functions.MetadataBuiltinFunctions";
-        try {
-            Class.forName(metadataFunctionLoaderClassName);
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-
     }
 
     static {
@@ -1527,27 +1520,17 @@
     }
 
     static {
-        datasetFunctions.add(getAsterixFunctionInfo(DATASET));
-        datasetFunctions.add(getAsterixFunctionInfo(FEED_COLLECT));
-        datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
-        datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
-    }
-
-    static {
-        addUnnestFun(DATASET, false);
-        addUnnestFun(FEED_COLLECT, false);
-        addUnnestFun(FEED_INTERCEPT, false);
         addUnnestFun(RANGE, true);
         addUnnestFun(SCAN_COLLECTION, false);
         addUnnestFun(SUBSET_COLLECTION, false);
     }
 
-    public static void addDatasetFunction(FunctionIdentifier fi) {
-        datasetFunctions.add(getAsterixFunctionInfo(fi));
+    public static void addDatasourceFunction(FunctionIdentifier fi, IFunctionToDataSourceRewriter transformer) {
+        datasourceFunctions.put(getAsterixFunctionInfo(fi), transformer);
     }
 
-    public static boolean isDatasetFunction(FunctionIdentifier fi) {
-        return datasetFunctions.contains(getAsterixFunctionInfo(fi));
+    public static IFunctionToDataSourceRewriter getDatasourceTransformer(FunctionIdentifier fi) {
+        return datasourceFunctions.getOrDefault(getAsterixFunctionInfo(fi), IFunctionToDataSourceRewriter.NOOP);
     }
 
     public static boolean isBuiltinCompilerFunction(FunctionSignature signature, boolean includePrivateFunctions) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java
new file mode 100644
index 0000000..2793c4d
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IFunctionToDataSourceRewriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.functions;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+
+public interface IFunctionToDataSourceRewriter {
+    public static final IFunctionToDataSourceRewriter NOOP = (o, c) -> false;
+
+    /**
+     * Replace the unnest operator by a datasource operator
+     *
+     * @param opRef
+     *            UnnestOperator to be replaced by DataSourceScanOperator
+     * @param context
+     *            optimization context
+     * @return true if transformed, false otherwise
+     * @throws AlgebricksException
+     */
+    boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
index 031669a..e4a057f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java
@@ -29,7 +29,7 @@
      * A fully open record type which has the name OpenRecord
      */
     public static final ARecordType FULLY_OPEN_RECORD_TYPE =
-            new ARecordType("OpenRecord", new String[0], new IAType[0], true);
+            new ARecordType("AnyObject", new String[0], new IAType[0], true);
 
     private RecordUtil() {
     }
@@ -73,6 +73,7 @@
      */
     public static int computeNullBitmapSize(ARecordType recordType) {
         return NonTaggedFormatUtil.hasOptionalField(recordType)
-                ? (int) Math.ceil(recordType.getFieldNames().length / 4.0) : 0;
+                ? (int) Math.ceil(recordType.getFieldNames().length / 4.0)
+                : 0;
     }
 }