diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
deleted file mode 100644
index 22d0d6b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface ITupleForwarder {
-
-    public static final String FORWARD_POLICY = "forward-policy";
-
-    public enum TupleForwardPolicy {
-        FRAME_FULL,
-        COUNTER_TIMER_EXPIRED,
-        RATE_CONTROLLED,
-        FEED
-    }
-
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
-
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
-
-    public void close() throws HyracksDataException;
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index bbd93c2..a324496 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -19,16 +19,13 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public abstract class AbstractDataFlowController implements IDataFlowController {
 
-    protected final ITupleForwarder tupleForwarder;
     protected final IHyracksTaskContext ctx;
 
-    public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder) {
+    public AbstractDataFlowController(IHyracksTaskContext ctx) {
         this.ctx = ctx;
-        this.tupleForwarder = tupleForwarder;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 3437de1..2533af5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,42 +18,31 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public abstract class AbstractFeedDataFlowController implements IDataFlowController {
-    protected final FeedTupleForwarder tupleForwarder;
+public abstract class AbstractFeedDataFlowController implements IDataFlowController, Closeable {
+    protected TupleForwarder tupleForwarder;
     protected final IHyracksTaskContext ctx;
     protected final int numOfFields;
     protected final ArrayTupleBuilder tb;
     protected final FeedLogManager feedLogManager;
     protected boolean flushing;
 
-    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfFields) {
+    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfFields) {
         this.feedLogManager = feedLogManager;
         this.numOfFields = numOfFields;
         this.ctx = ctx;
-        this.tupleForwarder = tupleForwarder;
         this.tb = new ArrayTupleBuilder(numOfFields);
     }
 
     @Override
-    public boolean pause() {
-        tupleForwarder.pause();
-        return true;
-    }
-
-    @Override
-    public boolean resume() {
-        tupleForwarder.resume();
-        return true;
-    }
-
-    @Override
     public void flush() throws HyracksDataException {
         flushing = true;
         tupleForwarder.flush();
@@ -61,4 +50,9 @@
     }
 
     public abstract String getStats();
+
+    @Override
+    public void close() throws IOException {
+        feedLogManager.close();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 2db17e2..b14722b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -30,11 +30,10 @@
 
     private final IRecordWithPKDataParser<T> dataParser;
 
-    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
-            final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+            final int numOfOutputFields, final IRecordWithPKDataParser<T> dataParser,
+            final IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
         this.dataParser = dataParser;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4447b28..621397b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -28,11 +28,10 @@
 
 public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> {
 
-    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
-            final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+            final int numOfOutputFields, final IRecordWithMetadataParser<T> dataParser,
+            final IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
deleted file mode 100644
index 10815d9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CounterTimerTupleForwarder implements ITupleForwarder {
-
-    public static final String BATCH_SIZE = "batch-size";
-    public static final String BATCH_INTERVAL = "batch-interval";
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private int batchSize;
-    private long batchInterval;
-    private int tuplesInFrame = 0;
-    private TimeBasedFlushTask flushTask;
-    private Timer timer;
-    private Object lock = new Object();
-    private boolean activeTimer = false;
-
-    private CounterTimerTupleForwarder(int batchSize, long batchInterval) {
-        this.batchSize = batchSize;
-        this.batchInterval = batchInterval;
-        if (batchInterval > 0L) {
-            activeTimer = true;
-        }
-    }
-
-    // Factory method
-    public static CounterTimerTupleForwarder create(Map<String, String> configuration) {
-        int batchSize = -1;
-        long batchInterval = 0L;
-        String propValue = configuration.get(BATCH_SIZE);
-        if (propValue != null) {
-            batchSize = Integer.parseInt(propValue);
-        }
-        propValue = configuration.get(BATCH_INTERVAL);
-        if (propValue != null) {
-            batchInterval = Long.parseLong(propValue);
-        }
-        return new CounterTimerTupleForwarder(batchSize, batchInterval);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        appender.reset(frame, true);
-        this.writer = writer;
-        if (activeTimer) {
-            this.timer = new Timer();
-            this.flushTask = new TimeBasedFlushTask(writer, lock);
-            timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
-        }
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (activeTimer) {
-            synchronized (lock) {
-                addTupleToFrame(tb);
-            }
-        } else {
-            addTupleToFrame(tb);
-        }
-        tuplesInFrame++;
-    }
-
-    private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (tuplesInFrame == batchSize
-                || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
-            }
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            tuplesInFrame = 0;
-            appender.reset(frame, true);
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            if (activeTimer) {
-                synchronized (lock) {
-                    FrameUtils.flushFrame(frame.getBuffer(), writer);
-                }
-            } else {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        }
-
-        if (timer != null) {
-            timer.cancel();
-        }
-    }
-
-    private class TimeBasedFlushTask extends TimerTask {
-
-        private IFrameWriter writer;
-        private final Object lock;
-
-        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
-            this.writer = writer;
-            this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (tuplesInFrame > 0) {
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
-                    }
-                    synchronized (lock) {
-                        FrameUtils.flushFrame(frame.getBuffer(), writer);
-                        appender.reset(frame, true);
-                        tuplesInFrame = 0;
-                    }
-                }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 9826be7..164ff68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -26,14 +26,12 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -58,10 +56,9 @@
     protected long incomingRecordsCount = 0;
     protected long failedRecordsCount = 0;
 
-    public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
+    public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
+            IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         recordReader.setFeedLogManager(feedLogManager);
@@ -79,7 +76,7 @@
         }
         Throwable failure = null;
         try {
-            tupleForwarder.initialize(ctx, writer);
+            this.tupleForwarder = new TupleForwarder(ctx, writer);
             while (hasNext()) {
                 IRawRecord<? extends T> record = next();
                 if (record == null) {
@@ -102,17 +99,14 @@
                 try {
                     flush();
                 } catch (Exception flushException) {
-                    tupleForwarder.fail();
                     flushException.addSuppressed(e);
                     failure = flushException;
                 }
             } else {
                 failure = e;
-                tupleForwarder.fail();
             }
         } catch (Throwable e) {
             failure = e;
-            tupleForwarder.fail();
             LOGGER.log(Level.WARN, "Failure while operating a feed source", e);
         } finally {
             failure = finish(failure);
@@ -168,11 +162,17 @@
     }
 
     private Throwable finish(Throwable failure) {
-        Throwable th = CleanupUtils.close(recordReader, null);
-        th = DataflowUtils.close(tupleForwarder, th);
+        Throwable th = CleanupUtils.close(recordReader, failure);
+        if (th == null) {
+            try {
+                tupleForwarder.complete();
+            } catch (Throwable completeFailure) {
+                th = completeFailure;
+            }
+        }
         closeSignal();
         setState(State.STOPPED);
-        return ExceptionUtils.suppress(failure, th);
+        return th;
     }
 
     private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 025520e..ffa42e5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,9 @@
     private final AsterixInputStream stream;
     protected long incomingRecordsCount = 0;
 
-    public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) {
-        super(ctx, tupleForwarder, feedLogManager, 1);
+    public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager,
+            IStreamDataParser streamParser, AsterixInputStream inputStream) {
+        super(ctx, feedLogManager, 1);
         this.dataParser = streamParser;
         this.stream = inputStream;
     }
@@ -41,7 +41,7 @@
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
-            tupleForwarder.initialize(ctx, writer);
+            tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
                 if (!parseNext()) {
                     break;
@@ -50,10 +50,9 @@
                 tupleForwarder.addTuple(tb);
                 incomingRecordsCount++;
             }
-        } catch (Exception e) {
+            tupleForwarder.complete();
+        } catch (Throwable e) {
             throw HyracksDataException.create(e);
-        } finally {
-            tupleForwarder.close();
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
deleted file mode 100644
index f824b67..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FeedTupleForwarder implements ITupleForwarder {
-
-    private final FeedLogManager feedLogManager;
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private boolean paused = false;
-    private boolean initialized;
-    private boolean failed;
-
-    public FeedTupleForwarder(FeedLogManager feedLogManager) {
-        this.feedLogManager = feedLogManager;
-    }
-
-    public FeedLogManager getFeedLogManager() {
-        return feedLogManager;
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
-        if (!initialized) {
-            this.frame = new VSizeFrame(ctx);
-            this.writer = writer;
-            this.appender = new FrameTupleAppender(frame);
-            initialized = true;
-        }
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (paused) {
-            synchronized (this) {
-                while (paused) {
-                    try {
-                        wait();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        throw HyracksDataException.create(e);
-                    }
-                }
-            }
-        }
-        DataflowUtils.addTupleToFrame(appender, tb, writer);
-    }
-
-    public void pause() {
-        paused = true;
-    }
-
-    public synchronized void resume() {
-        paused = false;
-        notifyAll();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        Throwable throwable = null;
-        try {
-            if (!failed && appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame.getBuffer(), writer);
-            }
-        } catch (Throwable th) {
-            throwable = th;
-            throw th;
-        } finally {
-            try {
-                feedLogManager.close();
-            } catch (IOException e) {
-                if (throwable != null) {
-                    throwable.addSuppressed(e);
-                } else {
-                    throw HyracksDataException.create(e);
-                }
-            } catch (Throwable th) {
-                if (throwable != null) {
-                    throwable.addSuppressed(th);
-                } else {
-                    throw th;
-                }
-            }
-        }
-    }
-
-    public void flush() throws HyracksDataException {
-        appender.flush(writer);
-    }
-
-    public void fail() throws HyracksDataException {
-        failed = true;
-        writer.fail();
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index c7f6d9c..289c16f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -30,10 +30,9 @@
 
     protected final IRecordWithMetadataParser<T> dataParser;
 
-    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
+            IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
         this.dataParser = dataParser;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
deleted file mode 100644
index 18927cd..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FrameFullTupleForwarder implements ITupleForwarder {
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        this.writer = writer;
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index c4f75e3..b956295 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -31,10 +30,9 @@
 public class IndexingDataFlowController<T> extends RecordDataFlowController<T> {
     private final IExternalIndexer indexer;
 
-    public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer)
-            throws IOException {
-        super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
+    public IndexingDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser,
+            IRecordReader<? extends T> recordReader, IExternalIndexer indexer) throws IOException {
+        super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields());
         this.indexer = indexer;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
deleted file mode 100644
index f34b77d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateControlledTupleForwarder implements ITupleForwarder {
-
-    private FrameTupleAppender appender;
-    private IFrame frame;
-    private IFrameWriter writer;
-    private long interTupleInterval;
-    private boolean delayConfigured;
-
-    public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
-    private RateControlledTupleForwarder(long interTupleInterval) {
-        this.interTupleInterval = interTupleInterval;
-        delayConfigured = interTupleInterval != 0L;
-    }
-
-    // Factory method
-    public static RateControlledTupleForwarder create(Map<String, String> configuration) {
-        long interTupleInterval = 0L;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
-        if (propValue != null) {
-            interTupleInterval = Long.parseLong(propValue);
-        }
-        return new RateControlledTupleForwarder(interTupleInterval);
-    }
-
-    @Override
-    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
-        this.appender = new FrameTupleAppender();
-        this.frame = new VSizeFrame(ctx);
-        this.writer = writer;
-        appender.reset(frame, true);
-    }
-
-    @Override
-    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
-        if (delayConfigured) {
-            try {
-                Thread.sleep(interTupleInterval);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
-            }
-        }
-        boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-        if (!success) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-            appender.reset(frame, true);
-            success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
-            if (!success) {
-                throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(frame.getBuffer(), writer);
-        }
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 0f9572d..aebdefb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -33,9 +32,9 @@
     protected final IRecordReader<? extends T> recordReader;
     protected final int numOfTupleFields;
 
-    public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, int numOfTupleFields) {
-        super(ctx, tupleForwarder);
+    public RecordDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser,
+            IRecordReader<? extends T> recordReader, int numOfTupleFields) {
+        super(ctx);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         this.numOfTupleFields = numOfTupleFields;
@@ -45,7 +44,7 @@
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
-            tupleForwarder.initialize(ctx, writer);
+            TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (recordReader.hasNext()) {
                 IRawRecord<? extends T> record = recordReader.next();
                 tb.reset();
@@ -54,7 +53,7 @@
                 appendOtherTupleFields(tb);
                 tupleForwarder.addTuple(tb);
             }
-            tupleForwarder.close();
+            tupleForwarder.complete();
             recordReader.close();
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index ccf22da..a28c484 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.dataflow;
 
 import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -28,9 +27,8 @@
 public class StreamDataFlowController extends AbstractDataFlowController {
     private final IStreamDataParser dataParser;
 
-    public StreamDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            IStreamDataParser dataParser) {
-        super(ctx, tupleForwarder);
+    public StreamDataFlowController(IHyracksTaskContext ctx, IStreamDataParser dataParser) {
+        super(ctx);
         this.dataParser = dataParser;
     }
 
@@ -38,7 +36,7 @@
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-            tupleForwarder.initialize(ctx, writer);
+            TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
                 tb.reset();
                 if (!dataParser.parse(tb.getDataOutput())) {
@@ -47,7 +45,7 @@
                 tb.addFieldEndOffset();
                 tupleForwarder.addTuple(tb);
             }
-            tupleForwarder.close();
+            tupleForwarder.complete();
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
new file mode 100644
index 0000000..afb00b0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class TupleForwarder {
+
+    private final FrameTupleAppender appender;
+    private final IFrame frame;
+    private final IFrameWriter writer;
+
+    public TupleForwarder(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+        this.frame = new VSizeFrame(ctx);
+        this.writer = writer;
+        this.appender = new FrameTupleAppender(frame);
+    }
+
+    public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+        DataflowUtils.addTupleToFrame(appender, tb, writer);
+    }
+
+    public void flush() throws HyracksDataException {
+        appender.flush(writer);
+    }
+
+    public void complete() throws HyracksDataException {
+        appender.write(writer, false);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index fd9db7e..2a92d40 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -18,12 +18,15 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedAdapter implements IDataSourceAdapter {
+public class FeedAdapter implements IDataSourceAdapter, Closeable {
     private final AbstractFeedDataFlowController controller;
 
     public FeedAdapter(AbstractFeedDataFlowController controller) {
@@ -54,4 +57,9 @@
     public AbstractFeedDataFlowController getController() {
         return controller;
     }
+
+    @Override
+    public void close() throws IOException {
+        controller.close();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 0503677..867fb60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
@@ -60,6 +61,7 @@
 
     @Override
     protected void start() throws HyracksDataException, InterruptedException {
+        Throwable failure = null;
         Thread.currentThread().setName("Intake Thread");
         try {
             writer.open();
@@ -79,11 +81,16 @@
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
             run();
-        } catch (Exception e) {
+        } catch (Throwable e) {
+            failure = e;
+            CleanupUtils.fail(writer, e);
             LOGGER.log(Level.WARN, "Failure during data ingestion", e);
-            throw e;
         } finally {
-            writer.close();
+            failure = CleanupUtils.close(adapter, failure);
+            failure = CleanupUtils.close(writer, failure);
+        }
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 78f24a5..3e53b52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -42,12 +42,10 @@
 import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
 import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
 import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.om.types.ARecordType;
@@ -70,35 +68,29 @@
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
                     if (indexingOp) {
-                        return new IndexingDataFlowController(ctx,
-                                DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
-                                recordReader, ((IIndexingDatasource) recordReader).getIndexer());
+                        return new IndexingDataFlowController(ctx, dataParser, recordReader,
+                                ((IIndexingDatasource) recordReader).getIndexer());
                     } else if (isFeed) {
-                        FeedTupleForwarder tupleForwarder =
-                                (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager);
                         boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
                         boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
                         if (isRecordWithMeta) {
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
-                                return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+                                return new ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2,
+                                        (IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
-                                return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
+                                return new FeedWithMetaDataFlowController(ctx, feedLogManager, 2,
                                         (IRecordWithMetadataParser) dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
-                            return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
+                            return new ChangeFeedDataFlowController(ctx, feedLogManager, numOfKeys + 1,
                                     (IRecordWithPKDataParser) dataParser, recordReader);
                         } else {
-                            return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader);
+                            return new FeedRecordDataFlowController(ctx, feedLogManager, 1, dataParser, recordReader);
                         }
                     } else {
-                        return new RecordDataFlowController(ctx,
-                                DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
-                                recordReader, 1);
+                        return new RecordDataFlowController(ctx, dataParser, recordReader, 1);
                     }
                 case STREAM:
                     IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
@@ -107,12 +99,9 @@
                     IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
                     streamParser.setInputStream(stream);
                     if (isFeed) {
-                        return new FeedStreamDataFlowController(ctx,
-                                (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
-                                feedLogManager, streamParser, stream);
+                        return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream);
                     } else {
-                        return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
-                                streamParser);
+                        return new StreamDataFlowController(ctx, streamParser);
                     }
                 default:
                     throw new RuntimeDataException(ErrorCode.PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 438f1df..37d400f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -18,28 +18,15 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.Map;
-
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.api.ITupleForwarder.TupleForwardPolicy;
-import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
-import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
-import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class DataflowUtils {
-    private static final Logger LOGGER = LogManager.getLogger();
 
     private DataflowUtils() {
     }
@@ -54,32 +41,6 @@
         }
     }
 
-    public static ITupleForwarder getTupleForwarder(Map<String, String> configuration, FeedLogManager feedLogManager)
-            throws HyracksDataException {
-        ITupleForwarder.TupleForwardPolicy policyType = null;
-        String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
-        if (ExternalDataUtils.isFeed(configuration)) {
-            // TODO pass this value in the configuration and avoid this check for feeds
-            policyType = TupleForwardPolicy.FEED;
-        } else if (propValue == null) {
-            policyType = TupleForwardPolicy.FRAME_FULL;
-        } else {
-            policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
-        }
-        switch (policyType) {
-            case FEED:
-                return new FeedTupleForwarder(feedLogManager);
-            case FRAME_FULL:
-                return new FrameFullTupleForwarder();
-            case COUNTER_TIMER_EXPIRED:
-                return CounterTimerTupleForwarder.create(configuration);
-            case RATE_CONTROLLED:
-                return RateControlledTupleForwarder.create(configuration);
-            default:
-                throw new RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_UNKNOWN_FORWARD_POLICY);
-        }
-    }
-
     public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
             throws HyracksDataException {
         if (!appender.append(tuple)) {
@@ -89,30 +50,4 @@
             }
         }
     }
-
-    /**
-     * Close the ITupleForwarder and suppress any Throwable thrown by the close call.
-     * This method must NEVER throw any Throwable
-     *
-     * @param indexHelper
-     *            the indexHelper to close
-     * @param root
-     *            the first exception encountered during release of resources
-     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
-     */
-    public static Throwable close(ITupleForwarder tupleForwarder, Throwable root) {
-        if (tupleForwarder != null) {
-            try {
-                tupleForwarder.close();
-            } catch (Throwable th) { // NOSONAR Will be re-thrown
-                try {
-                    LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
-                } catch (Throwable ignore) { // NOSONAR Logging exception will be ignored
-                    // NOSONAR ignore
-                }
-                root = ExceptionUtils.suppress(root, th);
-            }
-        }
-        return root;
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 9d887b6..42ebbd4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -20,6 +20,7 @@
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -35,7 +36,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class FeedLogManager {
+public class FeedLogManager implements Closeable {
 
     public enum LogEntryType {
         START, // partition start
@@ -64,7 +65,7 @@
     public FeedLogManager(File file) throws HyracksDataException {
         try {
             this.dir = file.toPath();
-            this.completed = new TreeSet<String>();
+            this.completed = new TreeSet<>();
             if (!exists()) {
                 create();
             }
@@ -100,16 +101,16 @@
 
     public synchronized void open() throws IOException {
         // read content of logs.
-        BufferedReader reader = Files.newBufferedReader(
-                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
-        String log = reader.readLine();
-        while (log != null) {
-            if (log.startsWith(END_PREFIX)) {
-                completed.add(getSplitId(log));
+        try (BufferedReader reader = Files.newBufferedReader(
+                Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME))) {
+            String log = reader.readLine();
+            while (log != null) {
+                if (log.startsWith(END_PREFIX)) {
+                    completed.add(getSplitId(log));
+                }
+                log = reader.readLine();
             }
-            log = reader.readLine();
         }
-        reader.close();
 
         progressLogger = Files.newBufferedWriter(
                 Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
@@ -122,6 +123,7 @@
                 StandardCharsets.UTF_8, StandardOpenOption.APPEND);
     }
 
+    @Override
     public synchronized void close() throws IOException {
         count--;
         if (count > 0) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 6fe938c..8ee8a57 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -28,11 +28,8 @@
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.ITupleForwarder;
+import org.apache.asterix.external.dataflow.TupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -79,26 +76,19 @@
             @Override
             public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
                 ADMDataParser parser;
-                ITupleForwarder forwarder;
+
                 ArrayTupleBuilder tb;
                 IApplicationContext appCtx =
                         (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
                 ClusterPartition nodePartition = appCtx.getMetadataProperties().getNodePartitions().get(nodeId)[0];
                 parser = new ADMDataParser(outputType, true);
-                forwarder =
-                        DataflowUtils
-                                .getTupleForwarder(configuration,
-                                        FeedUtils.getFeedLogManager(ctx, FeedUtils.splitsForAdapter(
-                                                ExternalDataUtils.getDataverse(configuration),
-                                                ExternalDataUtils.getFeedName(configuration), nodeId, nodePartition)));
                 tb = new ArrayTupleBuilder(1);
                 return new ITupleParser() {
-
                     @Override
                     public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
                         try {
                             parser.setInputStream(in);
-                            forwarder.initialize(ctx, writer);
+                            TupleForwarder forwarder = new TupleForwarder(ctx, writer);
                             while (true) {
                                 tb.reset();
                                 if (!parser.parse(tb.getDataOutput())) {
@@ -107,7 +97,7 @@
                                 tb.addFieldEndOffset();
                                 forwarder.addTuple(tb);
                             }
-                            forwarder.close();
+                            forwarder.complete();
                         } catch (Exception e) {
                             throw HyracksDataException.create(e);
                         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
index bf54e01..a116320 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -107,11 +107,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        writer.close();
-        downstreamOpen = false;
         if (downstreamFailed && !failCalledByUpstream) {
             throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
         }
+        writer.close();
+        downstreamOpen = false;
     }
 
     public static IFrameWriter enforce(IFrameWriter writer) {
