[ASTERIXDB-3603][FUN] Runtime changes for transform functions

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-63039
Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 3bb871b..6d37447 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -488,6 +488,16 @@
         return isFileStore ? String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : "";
     }
 
+    private LogicalVariable getUnnestVar(ILogicalOperator op) {
+        while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && !op.getInputs().isEmpty()) {
+            op = op.getInputs().get(0).getValue();
+        }
+        if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
+            return ((UnnestOperator) op).getVariable();
+        }
+        return null;
+    }
+
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
             ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
         MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -500,8 +510,11 @@
         ILogicalOperator topOp = p.first;
         List<LogicalVariable> liveVars = new ArrayList<>();
         VariableUtilities.getLiveVariables(topOp, liveVars);
-        LogicalVariable unnestVar = liveVars.get(0);
-        LogicalVariable resVar = unnestVar;
+        LogicalVariable unnestVar = getUnnestVar(topOp);
+        if (unnestVar == null) {
+            unnestVar = liveVars.get(0);
+        }
+        LogicalVariable resVar = liveVars.get(0);
 
         if (outputDatasetName == null) {
             FileSplit outputFileSplit = metadataProvider.getOutputFile();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
index 0ab3ace..f03a74d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;
 
--- Fail: More than one argument
-
 CREATE TRANSFORM FUNCTION transformTest1(doc) {
     SELECT count(*) as count FROM [doc] t UNNEST t.a
 };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
index 6d55991..a04ea00 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;
 
--- Fail: Less than one argument
-
 CREATE TRANSFORM FUNCTION transformTest2(doc) {
     SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt
 };
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
index c38bd72..0770d98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;
 
--- Fail: Using a collection in the definition
-
 CREATE TRANSFORM FUNCTION transformTest3(doc) {
     SELECT * FROM [doc] t UNNEST t.a LIMIT 1
 };
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
index 10cfb4e..d2c2265 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
@@ -18,8 +18,6 @@
  */
 use test;
 
--- Fail: Using a view in the definition
-
 CREATE TRANSFORM FUNCTION transformTest4(cust) {
     SELECT VALUE c
       FROM [cust] AS c
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 25d43b2..a60e53b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -689,12 +689,6 @@
             // in the cache.
             return null;
         }
-        //TODO(DB): review this and other similar ones
-        if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) {
-            // This transaction has dropped and subsequently created the same
-            // dataverse.
-            return null;
-        }
         function = cache.getFunction(functionSignature);
         if (function != null) {
             // Function is already in the cache, don't add it again.
@@ -1348,6 +1342,15 @@
         INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
     }
 
+    @Override
+    public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException {
+        try {
+            return metadataNode.getAllDatasets(ctx.getTxnId());
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
     private static class CCMetadataManagerImpl extends MetadataManager {
         private final MetadataProperties metadataProperties;
         private final ICcApplicationContext appCtx;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fdd36de..4ce8742 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -919,4 +919,6 @@
             String feedName) throws AlgebricksException;
 
     long getMaxTxnId();
+
+    List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException;
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index ee4a2c9..bc6789c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -1017,4 +1017,6 @@
 
     List<FeedConnection> getFeedConnections(TxnId txnId, String database, DataverseName dataverseName, String feedName)
             throws AlgebricksException, RemoteException;
+
+    List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, RemoteException;
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f11a338..63eda21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -2033,4 +2033,8 @@
             off += Character.charCount(codePointChar);
         }
     }
+
+    public List<Dataset> getAllDatasets() throws AlgebricksException {
+        return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx);
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
             this.funID = funID;
         }
 
+        public IScalarEvaluatorFactory getListEvalFactory() {
+            return listEvalFactory;
+        }
+
+        public SourceLocation getSourceLoc() {
+            return sourceLoc;
+        }
+
+        public FunctionIdentifier getFunID() {
+            return funID;
+        }
+
         @Override
         public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
             return new IUnnestingEvaluator() {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0c74260..9b643cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -38,10 +39,12 @@
     protected IFrame frame;
     protected FrameTupleAccessor tAccess;
     protected FrameTupleReference tRef;
+    protected boolean ignoreFailures = false;
 
     protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
         frame = new VSizeFrame(ctx);
         appender = new FrameTupleAppender(frame);
+        ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
         tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
@@ -59,8 +62,10 @@
         try {
             flushIfNotFailed();
         } catch (Exception e) {
-            closeException = e;
-            fail(closeException);
+            if (!ignoreFailures) {
+                closeException = e;
+                fail(closeException);
+            }
         } finally {
             closeException = CleanupUtils.close(writer, closeException);
         }
@@ -115,4 +120,15 @@
             throws HyracksDataException {
         FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+        if (appender != null) {
+            ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
+        }
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 202c087..c2e8473 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -46,15 +47,23 @@
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
     private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+    private final boolean ignoreFailures;
 
     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
             RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+        this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor, false);
+    }
+
+    public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+            RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor,
+            boolean ignoreFailures) {
         this.pipeline = pipeline;
         this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
         this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         this.runtimeMap = new HashMap<>();
+        this.ignoreFailures = ignoreFailures;
     }
 
     public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
@@ -99,6 +108,9 @@
                 } else {
                     newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]);
                 }
+                if (newRuntimes[j] instanceof AbstractOneInputOneOutputOneFramePushRuntime) {
+                    ((AbstractOneInputOneOutputOneFramePushRuntime) newRuntimes[j]).setIgnoreFailures(ignoreFailures);
+                }
             }
             runtimeMap.put(runtimeFactory, newRuntimes);
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..242c603 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
         return pipelines;
     }
 
+    public RecordDescriptor getInputRecordDesc() {
+        return inputRecordDesc;
+    }
+
+    public RecordDescriptor getOutputRecordDesc() {
+        return outputRecordDesc;
+    }
+
+    public IMissingWriterFactory[] getMissingWriterFactories() {
+        return missingWriterFactories;
+    }
+
+    public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+        return stats;
+    }
+
     public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
         this.stats.putAll(stats);
     }
@@ -97,20 +113,20 @@
     @Override
     public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        return new SubplanPushRuntime(ctx);
+        return new SubplanPushRuntime(ctx, false);
     }
 
-    private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
-        final IHyracksTaskContext ctx;
+        protected final IHyracksTaskContext ctx;
 
-        final NestedTupleSourceRuntime[] startOfPipelines;
+        protected final NestedTupleSourceRuntime[] startOfPipelines;
 
         boolean first;
 
         boolean profile;
 
-        SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
@@ -150,7 +166,8 @@
                     outputRecordDescriptor = pipelineLastRecordDescriptor;
                 }
 
-                PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
+                PipelineAssembler pa =
+                        new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures);
                 IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
                 startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final int outCol;
-    private final int positionalCol;
+    protected final int outCol;
+    protected final int positionalCol;
     private final IUnnestingEvaluatorFactory unnestingFactory;
     private final IUnnestingPositionWriterFactory positionWriterFactory;
     private final boolean leftOuter;
@@ -76,6 +76,26 @@
         this.missingWriterFactory = missingWriterFactory;
     }
 
+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public int getOutCol() {
+        return outCol;
+    }
+
+    public int getPositionalCol() {
+        return positionalCol;
+    }
+
+    public IUnnestingEvaluatorFactory getUnnestingFactory() {
+        return unnestingFactory;
+    }
+
+    public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+        return positionWriterFactory;
+    }
+
     @Override
     public String toString() {
         return "unnest " + outCol + (positionalCol >= 0 ? " at " + positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
-        ByteArrayAccessibleOutputStream missingBytes = leftOuter ? writeMissingBytes() : null;
-        IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-            private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
-            private final IUnnestingPositionWriter positionWriter =
-                    positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
-
-            @Override
-            public void open() throws HyracksDataException {
-                super.open();
-                if (tRef == null) {
-                    initAccessAppendRef(ctx);
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    try {
-                        unnest.init(tRef);
-                        unnesting(t);
-                    } catch (IOException ae) {
-                        throw HyracksDataException.create(ae);
-                    }
-                }
-            }
-
-            private void unnesting(int t) throws IOException {
-                // Assumes that when unnesting the tuple, each step() call for each element
-                // in the tuple will increase the positionIndex, and the positionIndex will
-                // be reset when a new tuple is to be processed.
-                int positionIndex = 1;
-                boolean emitted = false;
-                do {
-                    if (!unnest.step(p)) {
-                        break;
-                    }
-                    writeOutput(t, positionIndex++, false);
-                    emitted = true;
-                } while (true);
-                if (leftOuter && !emitted) {
-                    writeOutput(t, -1, true);
-                }
-            }
-
-            private void writeOutput(int t, int positionIndex, boolean missing)
-                    throws HyracksDataException, IOException {
-                tupleBuilder.reset();
-                for (int f = 0; f < projectionList.length; f++) {
-                    int col = projectionList[f];
-                    if (col == outCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
-                        } else {
-                            tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                        }
-                    } else if (col == positionalCol) {
-                        if (missing) {
-                            tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
-                        } else {
-                            positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
-                            tupleBuilder.addFieldEndOffset();
-                        }
-                    } else {
-                        tupleBuilder.addField(tAccess, t, projectionList[f]);
-                    }
-                }
-                appendToFrameFromTupleBuilder(tupleBuilder);
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
-            }
-        };
+        return new UnnestPushRuntime(ctx);
     }
 
-    private ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
+    public class UnnestPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+        private IPointable p = VoidPointable.FACTORY.createPointable();
+        protected ArrayTupleBuilder tupleBuilder;
+        protected IUnnestingEvaluator unnest;
+        private final IUnnestingPositionWriter positionWriter;
+        private final IHyracksTaskContext ctx;
+        protected ByteArrayAccessibleOutputStream missingBytes;
+
+        public UnnestPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+            this.ctx = ctx;
+            IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+            unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+            tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+            positionWriter =
+                    positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
+            missingBytes = leftOuter ? writeMissingBytes() : null;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            super.open();
+            if (tRef == null) {
+                initAccessAppendRef(ctx);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+                try {
+                    unnest.init(tRef);
+                    unnesting(t);
+                } catch (IOException ae) {
+                    throw HyracksDataException.create(ae);
+                }
+            }
+        }
+
+        protected void unnesting(int t) throws IOException {
+            // Assumes that when unnesting the tuple, each step() call for each element
+            // in the tuple will increase the positionIndex, and the positionIndex will
+            // be reset when a new tuple is to be processed.
+            int positionIndex = 1;
+            boolean emitted = false;
+            do {
+                if (!unnest.step(p)) {
+                    break;
+                }
+                writeOutput(t, positionIndex++, false);
+                emitted = true;
+            } while (true);
+            if (leftOuter && !emitted) {
+                writeOutput(t, -1, true);
+            }
+        }
+
+        private void writeOutput(int t, int positionIndex, boolean missing) throws HyracksDataException, IOException {
+            tupleBuilder.reset();
+            for (int f = 0; f < projectionList.length; f++) {
+                int col = projectionList[f];
+                if (col == outCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                    } else {
+                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                    }
+                } else if (col == positionalCol) {
+                    if (missing) {
+                        tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+                    } else {
+                        positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+                        tupleBuilder.addFieldEndOffset();
+                    }
+                } else {
+                    tupleBuilder.addField(tAccess, t, projectionList[f]);
+                }
+            }
+            appendToFrameFromTupleBuilder(tupleBuilder);
+        }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
+    };
+
+    protected ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
         ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream();
         IMissingWriter missingWriter = missingWriterFactory.createMissingWriter();
         missingWriter.writeMissing(new DataOutputStream(baos));
         return baos;
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e1acaee..7338cc7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -48,6 +48,7 @@
 
     protected int tupleCount;
     protected int tupleDataEndOffset;
+    protected boolean ignoreFailures = false;
 
     @Override
     public void reset(IFrame frame, boolean clear) throws HyracksDataException {
@@ -91,10 +92,21 @@
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         failIfInterrupted();
         getBuffer().clear();
-        outWriter.nextFrame(getBuffer());
-        if (clearFrame) {
-            frame.reset();
-            reset(getBuffer(), true);
+        if (!ignoreFailures) {
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        } else {
+            try {
+                outWriter.nextFrame(getBuffer());
+            } finally {
+                if (clearFrame) {
+                    frame.reset();
+                    reset(getBuffer(), true);
+                }
+            }
         }
     }
 
@@ -125,4 +137,12 @@
             throw HyracksDataException.create(new InterruptedException());
         }
     }
+
+    public void setIgnoreFailures(boolean ignoreFailures) {
+        this.ignoreFailures = ignoreFailures;
+    }
+
+    public boolean isIgnoreFailures() {
+        return ignoreFailures;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 3ef8b28..58194f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -299,5 +299,4 @@
         }
         return false;
     }
-
 }