[ASTERIXDB-3603][FUN] Add support for transform functions

- user model changes: yes
- storage format changes: no
- interface changes: yes

Ext-ref: MB-63039
Change-Id: If10b15ebd2e9b4ac7cac3b68ca4b301a6b58c30e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19832
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java
new file mode 100644
index 0000000..03ef29e
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/visitor/FunctionCardinalityInferenceVisitor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.visitor;
+
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.CardinalityInferenceVisitor;
+
+public class FunctionCardinalityInferenceVisitor extends CardinalityInferenceVisitor {
+
+    private FunctionCardinalityInferenceVisitor() {
+        super();
+    }
+
+    public static boolean isCardinalityZeroOrOne(ILogicalOperator operator) throws AlgebricksException {
+        FunctionCardinalityInferenceVisitor visitor = new FunctionCardinalityInferenceVisitor();
+        long cardinality = operator.accept(visitor, null);
+        return cardinality == ZERO_OR_ONE || cardinality == ONE;
+    }
+
+    @Override
+    public Long visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        // While translation a limit expression is encapsulated as a switch-case expression.
+        // switch-case(treat-as-integer(user_value_expr) > 0, true, treat-as-integer(user_value_expr), 0)
+        // Here we extract the user_value_expr from the switch-case expression
+        ScalarFunctionCallExpression inputExpr = (ScalarFunctionCallExpression) op.getMaxObjects().getValue();
+        ILogicalExpression expr = ((ScalarFunctionCallExpression) inputExpr.getArguments().get(2).getValue())
+                .getArguments().get(0).getValue();
+        if (expr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+            AsterixConstantValue constantValue = (AsterixConstantValue) ((ConstantExpression) expr).getValue();
+            if (constantValue.getObject() instanceof AInt64
+                    && ((AInt64) constantValue.getObject()).getLongValue() <= 1) {
+                return ZERO_OR_ONE;
+            }
+        }
+        return adjustCardinalityForTupleReductionOperator(op.getInputs().get(0).getValue().accept(this, arg));
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index e3e78e0..ed62ba3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -488,6 +488,16 @@
         return isFileStore ? String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : "";
     }
 
+    private LogicalVariable getUnnestVar(ILogicalOperator op) {
+        while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && !op.getInputs().isEmpty()) {
+            op = op.getInputs().get(0).getValue();
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
+            return ((UnnestOperator) op).getVariable();
+        }
+        return null;
+    }
+
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
             ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -500,8 +510,11 @@
         ILogicalOperator topOp = p.first;
         List<LogicalVariable> liveVars = new ArrayList<>();
         VariableUtilities.getLiveVariables(topOp, liveVars);
-        LogicalVariable unnestVar = liveVars.get(0);
-        LogicalVariable resVar = unnestVar;
+        LogicalVariable unnestVar = getUnnestVar(topOp);
+        if (unnestVar == null) {
+            unnestVar = liveVars.get(0);
+        }
+        LogicalVariable resVar = liveVars.get(0);
 
         if (outputDatasetName == null) {
             FileSplit outputFileSplit = metadataProvider.getOutputFile();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index fee660d..547afec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -48,6 +48,8 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
@@ -223,6 +225,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.optimizer.rules.visitor.FunctionCardinalityInferenceVisitor;
 import org.apache.asterix.runtime.fulltext.AbstractFullTextFilterDescriptor;
 import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor;
 import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor;
@@ -258,6 +261,9 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.Counter;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -3340,10 +3346,14 @@
                 function = new Function(functionSignature, paramNames, paramTypes, returnTypeSignature, null,
                         FunctionKind.SCALAR.toString(), library.getLanguage(), libraryDatabaseName,
                         libraryDataverseName, libraryName, externalIdentifier, cfs.getNullCall(),
-                        cfs.getDeterministic(), cfs.getResources(), dependencies, creator);
+                        cfs.getDeterministic(), cfs.getResources(), dependencies, creator, false);
             } else {
                 List<Pair<VarIdentifier, TypeExpression>> paramList = cfs.getParameters();
                 int paramCount = paramList.size();
+                if (cfs.isTransform() && paramCount != 1) {
+                    throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+                            "Transform function should have exactly one parameter");
+                }
                 List<VarIdentifier> paramVars = new ArrayList<>(paramCount);
                 List<String> paramNames = new ArrayList<>(paramCount);
                 for (Pair<VarIdentifier, TypeExpression> paramPair : paramList) {
@@ -3359,7 +3369,8 @@
                 // Check whether the function is usable:
                 // create a function declaration for this function,
                 // and a query body calls this function with each argument set to 'missing'
-                FunctionDecl fd = new FunctionDecl(functionSignature, paramVars, cfs.getFunctionBodyExpression(), true);
+                FunctionDecl fd = new FunctionDecl(functionSignature, paramVars, cfs.getFunctionBodyExpression(), true,
+                        cfs.isTransform());
                 fd.setSourceLocation(sourceLoc);
 
                 Query wrappedQuery = queryRewriter.createFunctionAccessorQuery(fd);
@@ -3369,17 +3380,25 @@
                 metadataProvider.setDefaultNamespace(ns);
                 LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider, fdList, null,
                         null, warningCollector, wrappedQuery.getVarCounter());
-                apiFramework.reWriteQuery(langRewritingContext, wrappedQuery, sessionOutput, false, false,
-                        Collections.emptyList());
+                List<VarIdentifier> externalVars = new ArrayList<>();
+                Pair<IReturningStatement, Integer> rewrittenQuery = apiFramework.reWriteQuery(langRewritingContext,
+                        wrappedQuery, sessionOutput, false, true, externalVars);
 
                 List<List<DependencyFullyQualifiedName>> dependencies =
                         FunctionUtil.getFunctionDependencies(metadataProvider, fd, queryRewriter);
+                if (cfs.isTransform()) {
+                    if (!dependencies.get(0).isEmpty()) {
+                        throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+                                "Transform function definition can not use collections/views");
+                    }
+                    validateTransformFunction(metadataProvider, rewrittenQuery, sourceLoc);
+                }
                 appCtx.getReceptionist().ensureAuthorized(requestParameters, metadataProvider);
 
                 newInlineTypes = Collections.emptyMap();
                 function = new Function(functionSignature, paramNames, null, null, cfs.getFunctionBody(),
                         FunctionKind.SCALAR.toString(), compilationProvider.getParserFactory().getLanguage(), null,
-                        null, null, null, null, null, null, dependencies, creator);
+                        null, null, null, null, null, null, dependencies, creator, cfs.isTransform());
             }
 
             if (existingFunction == null) {
@@ -3418,6 +3437,31 @@
         }
     }
 
+    private void validateTransformFunction(MetadataProvider metadataProvider,
+            Pair<IReturningStatement, Integer> rewrittenResult, SourceLocation sourceLoc) throws AlgebricksException {
+        ILangExpressionToPlanTranslatorFactory translatorFactory =
+                compilationProvider.getExpressionToPlanTranslatorFactory();
+        ILangExpressionToPlanTranslator t =
+                translatorFactory.createExpressionToPlanTranslator(metadataProvider, rewrittenResult.second, null);
+        org.apache.asterix.translator.ResultMetadata resultMetadata =
+                new org.apache.asterix.translator.ResultMetadata(sessionOutput.config().fmt());
+        ILogicalPlan plan = t.translate((Query) rewrittenResult.first, null, null, resultMetadata);
+        if (plan.getRoots().size() != 1) {
+            throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+                    "Transform function cannot have more than one root");
+        }
+        ILogicalOperator op = plan.getRoots().get(0).getValue().getInputs().get(0).getValue();
+        if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+            if (!FunctionCardinalityInferenceVisitor.isCardinalityZeroOrOne(op.getInputs().get(0).getValue())) {
+                throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+                        "Transform function cannot return more than one row");
+            }
+        } else {
+            throw new CompilationException(ErrorCode.INVALID_TRANSFORM_FUNCTION, sourceLoc,
+                    "Transform function should always contain a query");
+        }
+    }
+
     private Triple<TypeSignature, TypeSignature, Datatype> translateFunctionParameterType(
             FunctionSignature functionSignature, int paramIdx, TypeExpression paramTypeExpr, SourceLocation sourceLoc,
             MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp
new file mode 100644
index 0000000..39b0abd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.000.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TYPE T1 AS {
+    a: int32,
+    b: int32
+};
+
+CREATE COLLECTION test_collection(T1) PRIMARY KEY a;
+
+CREATE VIEW test_view AS
+    SELECT * FROM test_collection;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp
new file mode 100644
index 0000000..07f2fb1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.001.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: More than one argument
+
+CREATE TRANSFORM FUNCTION transformTest(a, b) {
+    SELECT * FROM [{"a": 1, "b": 2}] t
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp
new file mode 100644
index 0000000..61696ad
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.002.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Less than one argument
+
+CREATE TRANSFORM FUNCTION transformTest() {
+    SELECT * FROM [{"a": 1, "b": 2}] t
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp
new file mode 100644
index 0000000..dc1a277
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.003.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Using a collection in the definition
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+    SELECT count(*) as count FROM test_collection
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp
new file mode 100644
index 0000000..7ad55bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.004.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+-- Fail: Using a view in the definition
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+    SELECT count(*) FROM test_view
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp
new file mode 100644
index 0000000..9e4e7b9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/negative/transform.005.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest(doc) {
+    SELECT * FROM [doc] d UNNEST d.a
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp
new file mode 100644
index 0000000..39b0abd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.000.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+CREATE TYPE T1 AS {
+    a: int32,
+    b: int32
+};
+
+CREATE COLLECTION test_collection(T1) PRIMARY KEY a;
+
+CREATE VIEW test_view AS
+    SELECT * FROM test_collection;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
new file mode 100644
index 0000000..f03a74d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest1(doc) {
+    SELECT count(*) as count FROM [doc] t UNNEST t.a
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
new file mode 100644
index 0000000..a04ea00
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest2(doc) {
+    SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
new file mode 100644
index 0000000..0770d98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest3(doc) {
+    SELECT * FROM [doc] t UNNEST t.a LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
new file mode 100644
index 0000000..d2c2265
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest4(cust) {
+    SELECT VALUE c
+      FROM [cust] AS c
+      WHERE c.address.zipcode IS NOT MISSING
+      LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp
new file mode 100644
index 0000000..77153ce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.005.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+CREATE TRANSFORM FUNCTION transformTest5(ord) {
+    SELECT o.orderno, i.itemno, i.qty AS item_qty, i.price AS item_price
+      FROM [ord] AS o UNNEST o.items AS i
+      LIMIT 1
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 5db093b..de55877 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -14256,6 +14256,21 @@
         <output-dir compare="Text">drop_if_exists</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="function/transform">
+      <compilation-unit name="negative">
+        <output-dir compare="Text">negative</output-dir>
+        <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function should have exactly one parameter'</expected-error>
+        <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function should have exactly one parameter'</expected-error>
+        <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function definition can not use collections/views'</expected-error>
+        <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function definition can not use collections/views'</expected-error>
+        <expected-error>ASX1223: Failed to create transform function. Encountered error: 'Transform function cannot return more than one row'</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="function/transform">
+      <compilation-unit name="positive">
+        <output-dir compare="Text">positive</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="feeds">
     <test-case FilePath="feeds">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 463695e..260617a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -328,6 +328,7 @@
     CANNOT_TRUNCATE_DATASET_TYPE(1220),
     NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221),
     NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222),
+    INVALID_TRANSFORM_FUNCTION(1223),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index df08016..68c23c7 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -330,6 +330,7 @@
 1220 = Cannot truncate %1$s '%2$s'
 1221 = No valid authentication parameters were provided
 1222 = No valid authentication parameters were provided to impersonate service account
+1223 = Failed to create transform function. Encountered error: '%1$s'
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
index e0623a1..74b7bbd 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateFunctionStatement.java
@@ -63,9 +63,11 @@
 
     private final boolean replaceIfExists;
     private final boolean ifNotExists;
+    private final boolean transform;
 
     public CreateFunctionStatement(FunctionSignature signature, List<Pair<VarIdentifier, TypeExpression>> paramList,
-            String functionBody, Expression functionBodyExpression, boolean replaceIfExists, boolean ifNotExists) {
+            String functionBody, Expression functionBodyExpression, boolean replaceIfExists, boolean ifNotExists,
+            boolean transform) {
         this.signature = signature;
         this.functionBody = functionBody;
         this.functionBodyExpression = functionBodyExpression;
@@ -77,6 +79,7 @@
         this.options = null;
         this.replaceIfExists = replaceIfExists;
         this.ifNotExists = ifNotExists;
+        this.transform = transform;
     }
 
     public CreateFunctionStatement(FunctionSignature signature, List<Pair<VarIdentifier, TypeExpression>> paramList,
@@ -93,6 +96,7 @@
         this.functionBodyExpression = null;
         this.replaceIfExists = replaceIfExists;
         this.ifNotExists = ifNotExists;
+        this.transform = false;
     }
 
     public boolean getReplaceIfExists() {
@@ -196,6 +200,10 @@
         return Category.DDL;
     }
 
+    public boolean isTransform() {
+        return transform;
+    }
+
     private IAdmNode getOption(String optionName) {
         return options != null ? options.get(optionName) : null;
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
index 2ef11ad..1a924ee 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/FunctionDecl.java
@@ -34,13 +34,20 @@
     private Expression funcBody;
     private Expression funcBodyNormalized;
     private final boolean isStored;
+    private final boolean transform;
 
     public FunctionDecl(FunctionSignature signature, List<VarIdentifier> paramList, Expression funcBody,
-            boolean isStored) {
+            boolean isStored, boolean transform) {
         this.signature = signature;
         this.paramList = paramList;
         this.funcBody = funcBody;
         this.isStored = isStored;
+        this.transform = transform;
+    }
+
+    public FunctionDecl(FunctionSignature signature, List<VarIdentifier> paramList, Expression funcBody,
+            boolean isStored) {
+        this(signature, paramList, funcBody, isStored, false);
     }
 
     public FunctionSignature getSignature() {
@@ -72,6 +79,10 @@
         return isStored;
     }
 
+    public boolean isTransform() {
+        return transform;
+    }
+
     @Override
     public int hashCode() {
         return signature.hashCode();
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
index 06f22b7..4f09722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
@@ -179,7 +179,8 @@
         }
 
         Pair<ILangExpression, VariableSubstitutionEnvironment> p1 = fd.getFuncBody().accept(this, env);
-        FunctionDecl newF = new FunctionDecl(fd.getSignature(), newList, (Expression) p1.first, fd.isStored());
+        FunctionDecl newF =
+                new FunctionDecl(fd.getSignature(), newList, (Expression) p1.first, fd.isStored(), fd.isTransform());
         newF.setSourceLocation(fd.getSourceLocation());
         return new Pair<>(newF, env);
     }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
index 1d43d0b..9941dde 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
@@ -267,7 +267,7 @@
     @Override
     public FunctionDecl visit(FunctionDecl fd, Void arg) throws CompilationException {
         FunctionDecl copy = new FunctionDecl(fd.getSignature(), fd.getParamList(),
-                (Expression) fd.getFuncBody().accept(this, arg), fd.isStored());
+                (Expression) fd.getFuncBody().accept(this, arg), fd.isStored(), fd.isTransform());
         copy.setSourceLocation(fd.getSourceLocation());
         return copy;
     }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 0c95c77..13272e9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -1865,13 +1865,18 @@
   CreateFunctionStatement stmt = null;
 }
 {
-  <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace)
+  <TRANSFORM> <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace, true)
+  {
+    return stmt;
+  }
+  |
+  <FUNCTION> stmt = FunctionSpecification(startStmtToken, orReplace, false)
   {
     return stmt;
   }
 }
 
-CreateFunctionStatement FunctionSpecification(Token startStmtToken, boolean orReplace) throws ParseException:
+CreateFunctionStatement FunctionSpecification(Token startStmtToken, boolean orReplace, boolean transform) throws ParseException:
 {
   FunctionSignature signature = null;
   FunctionName fctName = null;
@@ -1926,7 +1931,7 @@
         getCurrentScope().addFunctionDescriptor(signature, false);
         removeCurrentScope();
         ensureNoTypeDeclsInFunction(fctName.function, params, returnType, startStmtToken);
-        stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, orReplace, ifNotExists);
+        stmt = new CreateFunctionStatement(signature, params, functionBody, functionBodyExpr, orReplace, ifNotExists, transform);
       }
     )
   |
@@ -6114,6 +6119,7 @@
   | <WITH : "with">
   | <WRITE : "write">
   | <COPY : "copy">
+  | <TRANSFORM : "transform">
 }
 
 <DEFAULT,IN_DBL_BRACE>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 25d43b2..a60e53b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -689,12 +689,6 @@
             // in the cache.
             return null;
         }
-        //TODO(DB): review this and other similar ones
-        if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) {
-            // This transaction has dropped and subsequently created the same
-            // dataverse.
-            return null;
-        }
         function = cache.getFunction(functionSignature);
         if (function != null) {
             // Function is already in the cache, don't add it again.
@@ -1348,6 +1342,15 @@
         INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
     }
 
+    @Override
+    public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException {
+        try {
+            return metadataNode.getAllDatasets(ctx.getTxnId());
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
     private static class CCMetadataManagerImpl extends MetadataManager {
         private final MetadataProperties metadataProperties;
         private final ICcApplicationContext appCtx;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 70bf83a..8db32d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -203,7 +203,7 @@
 
     public void dropFunction(FunctionSignature signature) {
         Function function = new Function(signature, null, null, null, null, null, null, null, null, null, null, false,
-                false, null, null, null);
+                false, null, null, null, false);
         droppedCache.addFunctionIfNotExists(function);
         logAndApply(new MetadataLogicalOperation(function, false));
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fdd36de..4ce8742 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -919,4 +919,6 @@
             String feedName) throws AlgebricksException;
 
     long getMaxTxnId();
+
+    List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException;
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index ee4a2c9..bc6789c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -1017,4 +1017,6 @@
 
     List<FeedConnection> getFeedConnections(TxnId txnId, String database, DataverseName dataverseName, String feedName)
             throws AlgebricksException, RemoteException;
+
+    List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, RemoteException;
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 338e00d..31a7f9b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -53,6 +53,7 @@
     public static final String FIELD_NAME_DEFAULT = "Default";
     public static final String FIELD_NAME_DEFINITION = "Definition";
     public static final String FIELD_NAME_DEPENDENCIES = "Dependencies";
+    public static final String FIELD_NAME_IS_TRANSFORM = "IsTransform";
     public static final String FIELD_NAME_DERIVED = "Derived";
     public static final String FIELD_NAME_DESCRIPTION = "Description";
     public static final String FIELD_NAME_EXTERNAL_DETAILS = "ExternalDetails";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f11a338..63eda21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -2033,4 +2033,8 @@
             off += Character.charCount(codePointChar);
         }
     }
+
+    public List<Dataset> getAllDatasets() throws AlgebricksException {
+        return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx);
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
index 3c1515b..c19b542 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Function.java
@@ -51,12 +51,13 @@
     private final Map<String, String> resources;
     private final List<List<DependencyFullyQualifiedName>> dependencies;
     private final Creator creator;
+    private final boolean transform;
 
     public Function(FunctionSignature signature, List<String> paramNames, List<TypeSignature> paramTypes,
             TypeSignature returnType, String functionBody, String functionKind, String language,
             String libraryDatabaseName, DataverseName libraryDataverseName, String libraryName,
             List<String> externalIdentifier, Boolean nullCall, Boolean deterministic, Map<String, String> resources,
-            List<List<DependencyFullyQualifiedName>> dependencies, Creator creator) {
+            List<List<DependencyFullyQualifiedName>> dependencies, Creator creator, boolean transform) {
         this.signature = signature;
         this.paramNames = paramNames;
         this.paramTypes = paramTypes;
@@ -75,6 +76,7 @@
                 ? Arrays.asList(Collections.emptyList(), Collections.emptyList(), Collections.emptyList())
                 : dependencies;
         this.creator = creator;
+        this.transform = transform;
     }
 
     public FunctionSignature getSignature() {
@@ -168,6 +170,10 @@
         return creator;
     }
 
+    public boolean isTransform() {
+        return transform;
+    }
+
     @Override
     public Function addToCache(MetadataCache cache) {
         return cache.addFunctionIfNotExists(this);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
index 2741d12..2319cb0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/FunctionTupleTranslator.java
@@ -206,9 +206,14 @@
         FunctionSignature signature = new FunctionSignature(databaseName, dataverseName, functionName, arity);
         Creator creator = Creator.createOrDefault(functionRecord);
 
+        int isTransformIndex = functionRecord.getType().getFieldIndex(MetadataRecordTypes.FIELD_NAME_IS_TRANSFORM);
+        boolean transform = false;
+        if (isTransformIndex >= 0) {
+            transform = ((ABoolean) functionRecord.getValueByPos(isTransformIndex)).getBoolean();
+        }
         return new Function(signature, paramNames, paramTypes, returnType, definition, functionKind, language,
                 libraryDatabaseName, libraryDataverseName, libraryName, externalIdentifier, nullCall, deterministic,
-                resources, dependencies, creator);
+                resources, dependencies, creator, transform);
     }
 
     private List<TypeSignature> getParamTypes(ARecord functionRecord, String functionDatabaseName,
@@ -435,6 +440,7 @@
         writeNullCall(function);
         writeDeterministic(function);
         writeFunctionCreator(function);
+        writeIsTransform(function);
     }
 
     protected void writeResources(Function function) throws HyracksDataException {
@@ -722,4 +728,15 @@
             recordBuilder.addField(fieldName, fieldValue);
         }
     }
+
+    private void writeIsTransform(Function function) throws HyracksDataException {
+        if (function.isTransform()) {
+            fieldName.reset();
+            aString.setValue(MetadataRecordTypes.FIELD_NAME_IS_TRANSFORM);
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            booleanSerde.serialize(ABoolean.TRUE, fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
             this.funID = funID;
         }
 
+        public IScalarEvaluatorFactory getListEvalFactory() {
+            return listEvalFactory;
+        }
+
+        public SourceLocation getSourceLoc() {
+            return sourceLoc;
+        }
+
+        public FunctionIdentifier getFunID() {
+            return funID;
+        }
+
         @Override
         public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
             return new IUnnestingEvaluator() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 6b25b1f..21d9ea2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -84,15 +84,15 @@
  * 5. the cardinality is some unknown value.
  */
 public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long, Void> {
-    private static final long ZERO_OR_ONE = 0L;
-    private static final long ONE = 1L;
-    private static final long ZERO_OR_ONE_GROUP = 2L;
-    private static final long ONE_GROUP = 3L;
-    private static final long UNKNOWN = 100L; // so it fits into the auto-boxing cache
+    protected static final long ZERO_OR_ONE = 0L;
+    protected static final long ONE = 1L;
+    protected static final long ZERO_OR_ONE_GROUP = 2L;
+    protected static final long ONE_GROUP = 3L;
+    protected static final long UNKNOWN = 100L; // so it fits into the auto-boxing cache
 
     private final Set<LogicalVariable> keyVariables = new HashSet<>();
 
-    private CardinalityInferenceVisitor() {
+    public CardinalityInferenceVisitor() {
     }
 
     @Override
@@ -379,7 +379,7 @@
     }
 
     // For operators including SELECT and LIMIT.
-    private long adjustCardinalityForTupleReductionOperator(long inputCardinality) {
+    protected long adjustCardinalityForTupleReductionOperator(long inputCardinality) {
         if (inputCardinality == ONE) {
             return ZERO_OR_ONE;
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0c74260..9b643cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -38,10 +39,12 @@
     protected IFrame frame;
     protected FrameTupleAccessor tAccess;
     protected FrameTupleReference tRef;
+    protected boolean ignoreFailures = false;
 
     protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
         frame = new VSizeFrame(ctx);
         appender = new FrameTupleAppender(frame);
+        ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
         tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
@@ -59,8 +62,10 @@
         try {
             flushIfNotFailed();
         } catch (Exception e) {
-            closeException = e;
-            fail(closeException);
+            if (!ignoreFailures) {
+                closeException = e;
+                fail(closeException);
+            }
         } finally {
             closeException = CleanupUtils.close(writer, closeException);
         }
@@ -115,4 +120,15 @@
             throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+        if (appender != null) {
+            ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
+        }
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 202c087..c2e8473 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -46,15 +47,23 @@
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
     private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+    private final boolean ignoreFailures;
 
     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
             RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+        this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor, false);
+    }
+
+    public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+            RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor,
+            boolean ignoreFailures) {
         this.pipeline = pipeline;
         this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
         this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         this.runtimeMap = new HashMap<>();
+        this.ignoreFailures = ignoreFailures;
     }
 
     public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
@@ -99,6 +108,9 @@
                 } else {
                     newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]);
                 }
+                if (newRuntimes[j] instanceof AbstractOneInputOneOutputOneFramePushRuntime) {
+                    ((AbstractOneInputOneOutputOneFramePushRuntime) newRuntimes[j]).setIgnoreFailures(ignoreFailures);
+                }
             }
             runtimeMap.put(runtimeFactory, newRuntimes);
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..242c603 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
         return pipelines;
     }
 
+    public RecordDescriptor getInputRecordDesc() {
+        return inputRecordDesc;
+    }
+
+    public RecordDescriptor getOutputRecordDesc() {
+        return outputRecordDesc;
+    }
+
+    public IMissingWriterFactory[] getMissingWriterFactories() {
+        return missingWriterFactories;
+    }
+
+    public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+        return stats;
+    }
+
     public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
         this.stats.putAll(stats);
     }
@@ -97,20 +113,20 @@
     @Override
     public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        return new SubplanPushRuntime(ctx);
+        return new SubplanPushRuntime(ctx, false);
     }
 
-    private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
-        final IHyracksTaskContext ctx;
+        protected final IHyracksTaskContext ctx;
 
-        final NestedTupleSourceRuntime[] startOfPipelines;
+        protected final NestedTupleSourceRuntime[] startOfPipelines;
 
         boolean first;
 
         boolean profile;
 
-        SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
@@ -150,7 +166,8 @@
                     outputRecordDescriptor = pipelineLastRecordDescriptor;
                 }
 
-                PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
+                PipelineAssembler pa =
+                        new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures);
                 IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
                 startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final int outCol;
-    private final int positionalCol;
+    protected final int outCol;
+    protected final int positionalCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
@@ -76,6 +76,26 @@
         this.missingWriterFactory = missingWriterFactory;
     }
 
+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public int getOutCol() {
+        return outCol;
+    }
+
+    public int getPositionalCol() {
+        return positionalCol;
+    }
+
+    public IUnnestingEvaluatorFactory getUnnestingFactory() {
+        return unnestingFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+        return positionWriterFactory;
+    }
+
     @Override
     public String toString() {
         return "unnest " + outCol + (positionalCol >= 0 ? " at " + positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        ByteArrayAccessibleOutputStream missingBytes = leftOuter ? writeMissingBytes() : null;
-        IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-            private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
-            private final IUnnestingPositionWriter positionWriter =
-                    positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
-
-            @Override
-            public void open() throws HyracksDataException {
-                super.open();
-                if (tRef == null) {
-                    initAccessAppendRef(ctx);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    try {
-                        unnest.init(tRef);
-                        unnesting(t);
-                    } catch (IOException ae) {
-                        throw HyracksDataException.create(ae);
-                    }
-                }
-            }
-
-            private void unnesting(int t) throws IOException {
-                // Assumes that when unnesting the tuple, each step() call for each element
-                // in the tuple will increase the positionIndex, and the positionIndex will
-                // be reset when a new tuple is to be processed.
-                int positionIndex = 1;
-                boolean emitted = false;
-                do {
-                    if (!unnest.step(p)) {
-                        break;
-                    }
-                    writeOutput(t, positionIndex++, false);
-                    emitted = true;
-                } while (true);
-                if (leftOuter && !emitted) {
-                    writeOutput(t, -1, true);
-                }
-            }
-
-            private void writeOutput(int t, int positionIndex, boolean missing)
-                    throws HyracksDataException, IOException {
-                tupleBuilder.reset();
-                for (int f = 0; f < projectionList.length; f++) {
-                    int col = projectionList[f];
-                    if (col == outCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
-                        } else {
-                            tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                        }
-                    } else if (col == positionalCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
-                        } else {
-                            positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
-                            tupleBuilder.addFieldEndOffset();
-                        }
-                    } else {
-                        tupleBuilder.addField(tAccess, t, projectionList[f]);
-                    }
-                }
-                appendToFrameFromTupleBuilder(tupleBuilder);
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
+        return new UnnestPushRuntime(ctx);
     }
 
-    private ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
+    public class UnnestPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+        private IPointable p = VoidPointable.FACTORY.createPointable();
+        protected ArrayTupleBuilder tupleBuilder;
+        protected IUnnestingEvaluator unnest;
+        private final IUnnestingPositionWriter positionWriter;
+        private final IHyracksTaskContext ctx;
+        protected ByteArrayAccessibleOutputStream missingBytes;
+
+        public UnnestPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+            this.ctx = ctx;
+            IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+            unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+            tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            positionWriter =
+                    positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
+            missingBytes = leftOuter ? writeMissingBytes() : null;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            super.open();
+            if (tRef == null) {
+                initAccessAppendRef(ctx);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+                try {
+                    unnest.init(tRef);
+                    unnesting(t);
+                } catch (IOException ae) {
+                    throw HyracksDataException.create(ae);
+                }
+            }
+        }
+
+        protected void unnesting(int t) throws IOException {
+            // Assumes that when unnesting the tuple, each step() call for each element
+            // in the tuple will increase the positionIndex, and the positionIndex will
+            // be reset when a new tuple is to be processed.
+            int positionIndex = 1;
+            boolean emitted = false;
+            do {
+                if (!unnest.step(p)) {
+                    break;
+                }
+                writeOutput(t, positionIndex++, false);
+                emitted = true;
+            } while (true);
+            if (leftOuter && !emitted) {
+                writeOutput(t, -1, true);
+            }
+        }
+
+        private void writeOutput(int t, int positionIndex, boolean missing) throws HyracksDataException, IOException {
+            tupleBuilder.reset();
+            for (int f = 0; f < projectionList.length; f++) {
+                int col = projectionList[f];
+                if (col == outCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                    } else {
+                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    }
+                } else if (col == positionalCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                    } else {
+                        positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } else {
+                    tupleBuilder.addField(tAccess, t, projectionList[f]);
+                }
+            }
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
+    };
+
+    protected ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
         ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream();
         IMissingWriter missingWriter = missingWriterFactory.createMissingWriter();
         missingWriter.writeMissing(new DataOutputStream(baos));
         return baos;
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e1acaee..7338cc7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -48,6 +48,7 @@
 
     protected int tupleCount;
     protected int tupleDataEndOffset;
+    protected boolean ignoreFailures = false;
 
     @Override
     public void reset(IFrame frame, boolean clear) throws HyracksDataException {
@@ -91,10 +92,21 @@
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         failIfInterrupted();
         getBuffer().clear();
-        outWriter.nextFrame(getBuffer());
-        if (clearFrame) {
-            frame.reset();
-            reset(getBuffer(), true);
+        if (!ignoreFailures) {
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        } else {
+            try {
+                outWriter.nextFrame(getBuffer());
+            } finally {
+                if (clearFrame) {
+                    frame.reset();
+                    reset(getBuffer(), true);
+                }
+            }
         }
     }
 
@@ -125,4 +137,12 @@
             throw HyracksDataException.create(new InterruptedException());
         }
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 3ef8b28..58194f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -299,5 +299,4 @@
         }
         return false;
     }
-
 }