Add Unit Tests for Feed Runtime Input Handler

Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
Reviewed-on: https://asterix-gerrit.ics.uci.edu/866
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3920a03..d201a6a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -35,7 +35,7 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /**
- * TODO: Add unit test cases for this class
+ * TODO: Add Failure cases unit tests for this class
  * Provides for error-handling and input-side buffering for a feed runtime.
  * .............______.............
  * ............|......|............
@@ -48,8 +48,9 @@
 public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
-    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
     private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
+    private static final boolean DEBUG = false;
+    private final Object mutex = new Object();
     private final FeedExceptionHandler exceptionHandler;
     private final FrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
@@ -58,15 +59,16 @@
     private final FrameTransporter consumer;
     private final Thread consumerThread;
     private final LinkedBlockingDeque<ByteBuffer> inbox;
-    private final ConcurrentFramePool memoryManager;
+    private final ConcurrentFramePool framePool;
     private Mode mode = Mode.PROCESS;
+    private int total = 0;
     private int numDiscarded = 0;
     private int numSpilled = 0;
     private int numProcessedInMemory = 0;
     private int numStalled = 0;
 
     public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
-            IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager)
+            IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
         this.spiller =
@@ -76,13 +78,13 @@
                         fpa.getMaxSpillOnDisk());
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
-        this.memoryManager = feedMemoryManager;
+        this.framePool = framePool;
         this.inbox = new LinkedBlockingDeque<>();
         this.consumer = new FrameTransporter();
-        this.consumerThread = new Thread();
+        this.consumerThread = new Thread(consumer);
         this.consumerThread.start();
         this.initialFrameSize = ctx.getInitialFrameSize();
-        this.frameAction = new FrameAction(inbox);
+        this.frameAction = new FrameAction();
     }
 
     @Override
@@ -101,15 +103,20 @@
 
     @Override
     public void close() throws HyracksDataException {
-        inbox.add(POISON_PILL);
-        notify();
+        consumer.poison();
+        synchronized (mutex) {
+            if (DEBUG) {
+                LOGGER.info("Producer is waking up consumer");
+            }
+            mutex.notify();
+        }
         try {
             consumerThread.join();
         } catch (InterruptedException e) {
             LOGGER.log(Level.WARNING, e.getMessage(), e);
         }
         try {
-            memoryManager.release(inbox);
+            framePool.release(inbox);
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
@@ -124,9 +131,13 @@
     @Override
     public void nextFrame(ByteBuffer frame) throws HyracksDataException {
         try {
+            total++;
             if (consumer.cause() != null) {
                 throw consumer.cause();
             }
+            if (DEBUG) {
+                LOGGER.info("nextFrame() called. inputHandler is in mode: " + mode.toString());
+            }
             switch (mode) {
                 case PROCESS:
                     process(frame);
@@ -148,25 +159,42 @@
         }
     }
 
+    // For unit testing purposes
+    public int framesOnDisk() {
+        return spiller.remaining();
+    }
+
     private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException {
         int numFrames = frameSize / initialFrameSize;
         if (numFrames == 1) {
-            return memoryManager.get();
+            return framePool.get();
         } else {
-            return memoryManager.get(frameSize);
+            return framePool.get(frameSize);
         }
     }
 
     private void discard(ByteBuffer frame) throws HyracksDataException {
+        if (DEBUG) {
+            LOGGER.info("starting discard(frame)");
+        }
         if (fpa.spillToDiskOnCongestion()) {
+            if (DEBUG) {
+                LOGGER.info("Spilling to disk is enabled. Will try that");
+            }
             if (spiller.spill(frame)) {
                 numSpilled++;
                 mode = Mode.SPILL;
                 return;
             }
         } else {
+            if (DEBUG) {
+                LOGGER.info("Spilling to disk is disabled. Will try to get a buffer");
+            }
             ByteBuffer next = getFreeBuffer(frame.capacity());
             if (next != null) {
+                if (DEBUG) {
+                    LOGGER.info("Was able to get a buffer");
+                }
                 numProcessedInMemory++;
                 next.put(frame);
                 inbox.offer(next);
@@ -174,102 +202,168 @@
                 return;
             }
         }
-        numDiscarded++;
+        if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) {
+            if (DEBUG) {
+                LOGGER.info("in discard(frame). Discard allowance has been consumed. --> Stalling");
+            }
+            stall(frame);
+        } else {
+            if (DEBUG) {
+                LOGGER.info("in discard(frame). So far, I have discarded " + numDiscarded);
+            }
+            numDiscarded++;
+        }
     }
 
-    private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+    private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
         if (fpa.spillToDiskOnCongestion()) {
             mode = Mode.SPILL;
             spiller.open();
             spill(frame);
         } else {
+            if (DEBUG) {
+                LOGGER.info("Spilling is disabled --> discardOrStall(frame)");
+            }
             discardOrStall(frame);
         }
     }
 
     private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
         if (fpa.discardOnCongestion()) {
-            numDiscarded++;
             mode = Mode.DISCARD;
             discard(frame);
         } else {
+            if (DEBUG) {
+                LOGGER.info("Discard is disabled --> stall(frame)");
+            }
             stall(frame);
         }
     }
 
     private void stall(ByteBuffer frame) throws HyracksDataException {
         try {
+            if (DEBUG) {
+                LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+            }
             numStalled++;
             // If spilling is enabled, we wait on the spiller
             if (fpa.spillToDiskOnCongestion()) {
-                synchronized (spiller) {
-                    while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
-                        spiller.wait();
-                    }
+                if (DEBUG) {
+                    LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
                 }
+                waitforSpillSpace();
                 spiller.spill(frame);
-                synchronized (this) {
-                    notify();
+                numSpilled++;
+                synchronized (mutex) {
+                    if (DEBUG) {
+                        LOGGER.info("Producer is waking up consumer");
+                    }
+                    mutex.notify();
                 }
                 return;
             }
+            if (DEBUG) {
+                LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+            }
             // Spilling is disabled, we subscribe to feedMemoryManager
             frameAction.setFrame(frame);
-            synchronized (frameAction) {
-                if (memoryManager.subscribe(frameAction)) {
-                    frameAction.wait();
-                }
+            framePool.subscribe(frameAction);
+            ByteBuffer temp = frameAction.retrieve();
+            inbox.put(temp);
+            numProcessedInMemory++;
+            if (DEBUG) {
+                LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
             }
-            synchronized (this) {
-                notify();
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
             }
         } catch (InterruptedException e) {
             throw new HyracksDataException(e);
         }
     }
 
+    private void waitforSpillSpace() throws InterruptedException {
+        synchronized (spiller) {
+            while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
+                if (DEBUG) {
+                    LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than "
+                            + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = "
+                            + spiller.usedBudget());
+                }
+                spiller.wait();
+            }
+        }
+    }
+
     private void process(ByteBuffer frame) throws HyracksDataException {
-        // Get a page from
-        ByteBuffer next = getFreeBuffer(frame.capacity());
+        // Get a page from frame pool
+        ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
         if (next != null) {
+            // Got a page from memory pool
             numProcessedInMemory++;
             next.put(frame);
-            inbox.offer(next);
-            if (inbox.size() == 1) {
-                synchronized (this) {
-                    notify();
-                }
+            try {
+                inbox.put(next);
+                notifyMemoryConsumer();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
             }
         } else {
-            // out of memory. we switch to next mode as per policy -- synchronized method
+            if (DEBUG) {
+                LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
+            }
+            // Out of memory. we switch to next mode as per policy
             exitProcessState(frame);
         }
     }
 
+    private void notifyMemoryConsumer() {
+        if (inbox.size() == 1) {
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
+            }
+        }
+    }
+
     private void spill(ByteBuffer frame) throws HyracksDataException {
         if (spiller.switchToMemory()) {
-            synchronized (this) {
+            synchronized (mutex) {
                 // Check if there is memory
-                ByteBuffer next = getFreeBuffer(frame.capacity());
+                ByteBuffer next = null;
+                if (frame.capacity() <= framePool.getMaxFrameSize()) {
+                    next = getFreeBuffer(frame.capacity());
+                }
                 if (next != null) {
                     spiller.close();
                     numProcessedInMemory++;
                     next.put(frame);
                     inbox.offer(next);
+                    notifyMemoryConsumer();
                     mode = Mode.PROCESS;
                 } else {
-                    // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0)
+                    // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
                     spiller.spill(frame);
                     numSpilled++;
-                    notify();
+                    if (DEBUG) {
+                        LOGGER.info("Producer is waking up consumer");
+                    }
+                    mutex.notify();
                 }
             }
         } else {
             // try to spill. If failed switch to either discard or stall
             if (spiller.spill(frame)) {
+                notifyDiskConsumer();
                 numSpilled++;
             } else {
                 if (fpa.discardOnCongestion()) {
+                    mode = Mode.DISCARD;
                     discard(frame);
                 } else {
                     stall(frame);
@@ -278,11 +372,22 @@
         }
     }
 
+    private void notifyDiskConsumer() {
+        if (spiller.remaining() == 1) {
+            synchronized (mutex) {
+                if (DEBUG) {
+                    LOGGER.info("Producer is waking up consumer");
+                }
+                mutex.notify();
+            }
+        }
+    }
+
     public Mode getMode() {
         return mode;
     }
 
-    public synchronized void setMode(Mode mode) {
+    public void setMode(Mode mode) {
         this.mode = mode;
     }
 
@@ -311,15 +416,22 @@
 
     private class FrameTransporter implements Runnable {
         private volatile Throwable cause;
+        private int consumed = 0;
+        private boolean poisoned = false;
 
         public Throwable cause() {
             return cause;
         }
 
+        public void poison() {
+            poisoned = true;
+        }
+
         private Throwable consume(ByteBuffer frame) {
             while (frame != null) {
                 try {
                     writer.nextFrame(frame);
+                    consumed++;
                     frame = null;
                 } catch (HyracksDataException e) {
                     // It is fine to catch throwable here since this thread is always expected to terminate gracefully
@@ -340,7 +452,7 @@
         public void run() {
             try {
                 ByteBuffer frame = inbox.poll();
-                while (frame != POISON_PILL) {
+                while (true) {
                     if (frame != null) {
                         try {
                             if (consume(frame) != null) {
@@ -348,7 +460,7 @@
                             }
                         } finally {
                             // Done with frame.
-                            memoryManager.release(frame);
+                            framePool.release(frame);
                         }
                     }
                     frame = inbox.poll();
@@ -366,13 +478,22 @@
                         writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if there is 0 memory?
-                        synchronized (FeedRuntimeInputHandler.this) {
+                        synchronized (mutex) {
                             frame = inbox.poll();
                             if (frame == null) {
                                 // Nothing in memory
                                 if (spiller.switchToMemory()) {
+                                    if (poisoned) {
+                                        break;
+                                    }
+                                    if (DEBUG) {
+                                        LOGGER.info("Consumer is going to sleep");
+                                    }
                                     // Nothing in disk
-                                    FeedRuntimeInputHandler.this.wait();
+                                    mutex.wait();
+                                    if (DEBUG) {
+                                        LOGGER.info("Consumer is waking up");
+                                    }
                                 }
                             }
                         }
@@ -383,5 +504,18 @@
             }
             // cleanup will always be done through the close() call
         }
+
+        @Override
+        public String toString() {
+            return "consumed: " + consumed;
+        }
+    }
+
+    public int getTotal() {
+        return total;
+    }
+
+    public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+        return inbox;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
index 4a2120a..f02b4aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -19,29 +19,45 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
 
-import rx.functions.Action1;
+import org.apache.log4j.Logger;
 
-public class FrameAction implements Action1<ByteBuffer> {
-    private final LinkedBlockingDeque<ByteBuffer> inbox;
+public class FrameAction {
+    private static final boolean DEBUG = false;
+    private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+    private ByteBuffer allocated;
     private ByteBuffer frame;
 
-    public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
     public void call(ByteBuffer freeFrame) {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: My subscription is being answered");
+        }
         freeFrame.put(frame);
-        inbox.add(freeFrame);
         synchronized (this) {
-            notify();
+            allocated = freeFrame;
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Waking up waiting threads");
+            }
+            notifyAll();
         }
     }
 
-    public ByteBuffer getFrame() {
-        return frame;
+    public synchronized ByteBuffer retrieve() throws InterruptedException {
+        if (DEBUG) {
+            LOGGER.info("FrameAction: Attempting to get allocated buffer");
+        }
+        while (allocated == null) {
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+            }
+            wait();
+            if (DEBUG) {
+                LOGGER.info("FrameAction: Awoken Up");
+            }
+        }
+        ByteBuffer temp = allocated;
+        allocated = null;
+        return temp;
     }
 
     public void setFrame(ByteBuffer frame) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index f0d226a..a2f19bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -44,6 +44,7 @@
 public class FrameSpiller {
     private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName());
     private static final int FRAMES_PER_FILE = 1024;
+    public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
 
     private final String fileNamePrefix;
     private final ArrayDeque<File> files = new ArrayDeque<>();
@@ -88,11 +89,11 @@
     }
 
     public synchronized ByteBuffer next() throws HyracksDataException {
+        frame.reset();
+        if (totalReadCount == totalWriteCount) {
+            return null;
+        }
         try {
-            frame.reset();
-            if (totalReadCount == totalWriteCount) {
-                return null;
-            }
             if (currentReadFile == null) {
                 if (!files.isEmpty()) {
                     currentReadFile = files.pop();
@@ -126,6 +127,10 @@
             return frame.getBuffer();
         } catch (Exception e) {
             throw new HyracksDataException(e);
+        } finally {
+            synchronized (this) {
+                notify();
+            }
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
index 25aa86a..e5543d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -28,10 +28,15 @@
 
 import org.apache.asterix.external.feed.dataflow.FrameAction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
 
 public class ConcurrentFramePool {
+    private static final boolean DEBUG = false;
     private static final String ERROR_INVALID_FRAME_SIZE =
             "The size should be an integral multiple of the default frame size";
+    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+            "The requested frame size must not be greater than the allocated budget";
+    private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
     private final String nodeId;
     private final int budget;
     private final int defaultFrameSize;
@@ -49,11 +54,31 @@
         this.largeFramesPools = new HashMap<>();
     }
 
+    public int getMaxFrameSize() {
+        return budget * defaultFrameSize;
+    }
+
     public synchronized ByteBuffer get() {
+        // Subscribers have higher priority
+        if (subscribers.isEmpty()) {
+            return doGet();
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
+        return null;
+    }
+
+    private ByteBuffer doGet() {
         if (handedOut < budget) {
             handedOut++;
             return allocate();
         }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = 1");
+        }
         return null;
     }
 
@@ -61,11 +86,15 @@
         return budget - handedOut;
     }
 
-    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+    private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+        // Subscribers have higher priority
         if (bufferSize % defaultFrameSize != 0) {
             throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
         }
         int multiplier = bufferSize / defaultFrameSize;
+        if (multiplier > budget) {
+            throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+        }
         if (handedOut + multiplier <= budget) {
             handedOut += multiplier;
             ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
@@ -76,9 +105,26 @@
                 created += multiplier;
                 return ByteBuffer.allocate(bufferSize);
             }
-            return largeFramesPool.poll();
+            ByteBuffer buffer = largeFramesPool.poll();
+            buffer.clear();
+            return buffer;
         }
         // Not enough budget
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+                    + ", Requested = " + multiplier);
+        }
+        return null;
+    }
+
+    public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+        if (subscribers.isEmpty()) {
+            return doGet(bufferSize);
+        }
+        if (DEBUG) {
+            LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+                    + subscribers.size());
+        }
         return null;
     }
 
@@ -121,7 +167,9 @@
             created++;
             return ByteBuffer.allocate(defaultFrameSize);
         } else {
-            return pool.pop();
+            ByteBuffer buffer = pool.pop();
+            buffer.clear();
+            return buffer;
         }
     }
 
@@ -150,6 +198,9 @@
     public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
         int multiples = buffer.capacity() / defaultFrameSize;
         handedOut -= multiples;
+        if (DEBUG) {
+            LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+        }
         if (multiples == 1) {
             pool.add(buffer);
         } else {
@@ -163,22 +214,48 @@
         // check subscribers
         while (!subscribers.isEmpty()) {
             FrameAction frameAction = subscribers.peek();
+            ByteBuffer freeBuffer;
             // check if we have enough and answer immediately.
             if (frameAction.getSize() == defaultFrameSize) {
-                buffer = get();
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+                }
+                freeBuffer = doGet();
             } else {
-                buffer = get(frameAction.getSize());
+                if (DEBUG) {
+                    LOGGER.info("Attempting to callback a subscriber that requested "
+                            + frameAction.getSize() / defaultFrameSize + " frames");
+                }
+                freeBuffer = doGet(frameAction.getSize());
             }
-            if (buffer != null) {
+            if (freeBuffer != null) {
+                int handedOutBeforeCall = handedOut;
                 try {
-                    frameAction.call(buffer);
+                    frameAction.call(freeBuffer);
+                } catch (Exception e) {
+                    LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+                    // TODO(amoudi): Add test cases and get rid of recursion
+                    if (handedOut == handedOutBeforeCall) {
+                        release(freeBuffer);
+                    }
+                    throw e;
                 } finally {
                     subscribers.remove();
+                    if (DEBUG) {
+                        LOGGER.info(
+                                "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+                    }
                 }
             } else {
+                if (DEBUG) {
+                    LOGGER.info("Failed to allocate requested frames");
+                }
                 break;
             }
         }
+        if (DEBUG) {
+            LOGGER.info(subscribers.size() + " remaining subscribers");
+        }
     }
 
     public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
@@ -187,18 +264,30 @@
             ByteBuffer buffer;
             // check if we have enough and answer immediately.
             if (frameAction.getSize() == defaultFrameSize) {
-                buffer = get();
+                buffer = doGet();
             } else {
-                buffer = get(frameAction.getSize());
+                buffer = doGet(frameAction.getSize());
             }
             if (buffer != null) {
                 frameAction.call(buffer);
                 // There is no need to subscribe. perform action and return false
                 return false;
             }
+        } else {
+            int multiplier = frameAction.getSize() / defaultFrameSize;
+            if (multiplier > budget) {
+                throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+            }
         }
         // none of the above, add to subscribers and return true
         subscribers.add(frameAction);
         return true;
     }
+
+    /*
+     * For unit testing purposes
+     */
+    public Collection<FrameAction> getSubscribers() {
+        return subscribers;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
index f888542..18d4cff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -35,11 +35,11 @@
     private final String targetId;
     private final int hashCode;
 
-    public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String operandId) {
+    public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
         this.feedId = feedId;
         this.runtimeType = runtimeType;
         this.partition = partition;
-        this.targetId = operandId;
+        this.targetId = targetId;
         this.hashCode = toString().hashCode();
     }
 
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
similarity index 74%
rename from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
rename to asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
index 8a6d1b6..444d8a5 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -22,8 +22,10 @@
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
 import org.apache.asterix.external.feed.management.ConcurrentFramePool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.junit.Assert;
@@ -36,7 +38,7 @@
 import junit.framework.TestSuite;
 
 @RunWith(PowerMockRunner.class)
-public class FeedMemoryManagerUnitTest extends TestCase {
+public class ConcurrentFramePoolUnitTest extends TestCase {
 
     private static final int DEFAULT_FRAME_SIZE = 32768;
     private static final int NUM_FRAMES = 2048;
@@ -44,8 +46,9 @@
     private static final int NUM_THREADS = 8;
     private static final int MAX_SIZE = 52;
     private static final double RELEASE_PROBABILITY = 0.20;
+    private volatile static HyracksDataException cause = null;
 
-    public FeedMemoryManagerUnitTest(String testName) {
+    public ConcurrentFramePoolUnitTest(String testName) {
         super(testName);
     }
 
@@ -53,7 +56,7 @@
      * @return the suite of tests being tested
      */
     public static Test suite() {
-        return new TestSuite(FeedMemoryManagerUnitTest.class);
+        return new TestSuite(ConcurrentFramePoolUnitTest.class);
     }
 
     @org.junit.Test
@@ -67,6 +70,7 @@
             i++;
         }
         Assert.assertEquals(i, NUM_FRAMES);
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -97,6 +101,7 @@
             th.printStackTrace();
             Assert.fail(th.getMessage());
         }
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -131,6 +136,7 @@
             th.printStackTrace();
             Assert.fail(th.getMessage());
         }
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -170,6 +176,7 @@
             th.printStackTrace();
             Assert.fail(th.getMessage());
         }
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -201,6 +208,7 @@
         }
         stack.clear();
         Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -231,6 +239,7 @@
             th.printStackTrace();
             Assert.fail(th.getMessage());
         }
+        Assert.assertNull(cause);
     }
 
     @org.junit.Test
@@ -281,6 +290,8 @@
         } catch (Throwable th) {
             th.printStackTrace();
             Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
         }
     }
 
@@ -315,6 +326,125 @@
         } catch (Throwable th) {
             th.printStackTrace();
             Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testFixedSizeSubscribtion() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+            Assert.assertEquals(0, fmm.remaining());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetRequests() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            fmm.get(32 * DEFAULT_FRAME_SIZE);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testLargerThanBudgetSubscribe() {
+        HyracksDataException hde = null;
+        try {
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            fmm.subscribe(frameAction);
+        } catch (HyracksDataException e) {
+            hde = e;
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        }
+        Assert.assertNotNull(hde);
+        Assert.assertNull(cause);
+    }
+
+    @org.junit.Test
+    public void testgetWhileSubscribersExist() {
+        try {
+            AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+            Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+            ConcurrentFramePool fmm =
+                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            int i = 0;
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+            FrameAction frameAction = new FrameAction();
+            frameAction.setFrame(buffer);
+            while (!fmm.subscribe(frameAction)) {
+                buffers.put(frameAction.retrieve());
+                i++;
+            }
+            // One subscriber.
+            // Check that all frames have been consumed
+            Assert.assertEquals(i, NUM_FRAMES);
+            // Release a frame (That will be handed out to the subscriber)
+            fmm.release(buffers.take());
+            // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffers.put(frameAction.retrieve());
+            // Create another subscriber that takes frames of double the size
+            ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+            LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>();
+            FrameAction frameActionTimes2 = new FrameAction();
+            frameActionTimes2.setFrame(bufferTimes2);
+            Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+            // release a small one
+            fmm.release(buffers.take());
+            Assert.assertEquals(fmm.remaining(), 1);
+            // Check that a small get fails
+            Assert.assertEquals(null, fmm.get());
+            // release another small one
+            fmm.release(buffers.take());
+            // Check that no small frames exists in the pool since subscriber request was satisfied
+            Assert.assertEquals(fmm.remaining(), 0);
+            buffersTimes2.add(frameActionTimes2.retrieve());
+            fmm.release(buffers);
+            fmm.release(bufferTimes2);
+            Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail(th.getMessage());
+        } finally {
+            Assert.assertNull(cause);
         }
     }
 
@@ -362,7 +492,8 @@
                         try {
                             fmm.release(stack.pop());
                         } catch (HyracksDataException e) {
-                            Assert.fail();
+                            e.printStackTrace();
+                            cause = e;
                         }
                     }
                 } else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
new file mode 100644
index 0000000..705d5e3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestControlledFrameWriter;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class InputHandlerTest extends TestCase {
+
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int NUM_FRAMES = 128;
+    private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+    private static final String DATAVERSE = "dataverse";
+    private static final String DATASET = "dataset";
+    private static final String FEED = "feed";
+    private static final String NODE_ID = "NodeId";
+    private static final float DISCARD_ALLOWANCE = 0.15f;
+    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
+    private volatile static HyracksDataException cause = null;
+
+    public InputHandlerTest(String testName) {
+        super(testName);
+    }
+
+    public static Test suite() {
+        return new TestSuite(InputHandlerTest.class);
+    }
+
+    private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
+            FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
+        FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
+        FeedId feedId = new FeedId(DATAVERSE, FEED);
+        FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
+        FeedRuntimeId runtimeId =
+                new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID);
+        return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
+    }
+
+    /*
+     * Testing the following scenarios
+     * 01. Positive Frames memory budget with fixed size frames, no spill, no discard.
+     * 02. Positive Frames memory budget with variable size frames, no spill, no discard.
+     * 03. Positive Frames memory budget with fixed size frames, with spill, no discard.
+     * 04. Positive Frames memory budget with variable size frames, with spill, no discard.
+     * 05. Positive Frames memory budget with fixed size frames, no spill, with discard.
+     * 06. Positive Frames memory budget with variable size frames, no spill, with discard.
+     * 07. Positive Frames memory budget with fixed size frames, with spill, with discard.
+     * 08. Positive Frames memory budget with variable size frames, with spill, with discard.
+     * 09. 0 Frames memory budget with fixed size frames, with spill, no discard.
+     * 10. 0 Frames memory budget with variable size frames, with spill, no discard.
+     * 11. TODO 0 Frames memory budget with fixed size frames, with spill, with discard.
+     * 12. TODO 0 Frames memory budget with variable size frames, with spill, with discard.
+     * 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail exception throwing FrameWriter
+     * 14. TODO Test exception while waiting for subscription
+     */
+
+    private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget,
+            float discardFraction) {
+        FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
+        Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+        Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
+        Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
+        Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
+        Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction);
+        return fpa;
+    }
+
+    @org.junit.Test
+    public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
+        try {
+            int numRounds = 5;
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            handler.nextFrame(buffer);
+            Assert.assertEquals(0, handler.getNumProcessedInMemory());
+            Assert.assertEquals(1, handler.getNumSpilled());
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                int multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+                handler.nextFrame(buffer);
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        } finally {
+            Assert.assertNull(cause);
+        }
+    }
+
+    @org.junit.Test
+    public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            handler.nextFrame(frame.getBuffer());
+            Assert.assertEquals(0, handler.getNumProcessedInMemory());
+            Assert.assertEquals(1, handler.getNumSpilled());
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        } finally {
+            Assert.assertNull(cause);
+        }
+
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryVarSizeFrameWithSpillWithDiscard() {
+        try {
+            int numberOfMemoryFrames = 50;
+            int numberOfSpillFrames = 50;
+            int notDiscarded = 0;
+            int totalMinFrames = 0;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+            ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3);
+            ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4);
+            ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5);
+            while (true) {
+                if (totalMinFrames + 1 < numberOfMemoryFrames) {
+                    handler.nextFrame(buffer1);
+                    notDiscarded++;
+                    totalMinFrames++;
+                } else {
+                    break;
+                }
+                if (totalMinFrames + 2 < numberOfMemoryFrames) {
+                    notDiscarded++;
+                    totalMinFrames += 2;
+                    handler.nextFrame(buffer2);
+                } else {
+                    break;
+                }
+                if (totalMinFrames + 3 < numberOfMemoryFrames) {
+                    notDiscarded++;
+                    totalMinFrames += 3;
+                    handler.nextFrame(buffer3);
+                } else {
+                    break;
+                }
+            }
+            // Now we need to verify that the frame pool memory has been consumed!
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(0, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            while (true) {
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer3);
+                } else {
+                    break;
+                }
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer4);
+                } else {
+                    break;
+                }
+                if (handler.getNumSpilled() < numberOfSpillFrames) {
+                    notDiscarded++;
+                    handler.nextFrame(buffer5);
+                } else {
+                    break;
+                }
+            }
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+            Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames);
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            // We can only discard one frame
+            double numDiscarded = 0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(buffer5);
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Assert.assertTrue(framePool.remaining() < 3);
+            Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = true
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
+        try {
+            int numberOfMemoryFrames = 50;
+            int numberOfSpillFrames = 50;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            for (int i = 0; i < numberOfMemoryFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Now we need to verify that the frame pool memory has been consumed!
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals(numberOfMemoryFrames, handler.getTotal());
+            Assert.assertEquals(0, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            for (int i = 0; i < numberOfSpillFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames, handler.getTotal());
+            Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals(0, handler.getNumDiscarded());
+            // We can only discard one frame
+            double numDiscarded = 0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(frame.getBuffer());
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Assert.assertEquals(0, framePool.remaining());
+            Assert.assertEquals((int) (numberOfMemoryFrames + numberOfSpillFrames + numDiscarded), handler.getTotal());
+            Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+            Assert.assertEquals(0, handler.getNumStalled());
+            Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertTrue(result.isDone());
+            Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames + numberOfSpillFrames + 1);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
+        try {
+            int discardTestFrames = 100;
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            // add NUM_FRAMES times
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            int numFrames = 0;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                numFrames++;
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // Next call should NOT block but should discard.
+            double numDiscarded = 0.0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(buffer);
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                // Check that no records were discarded
+                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 0);
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), numFrames + 1);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = true; discard only 5%
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
+        try {
+            int discardTestFrames = 100;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < discardTestFrames; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next 5 calls call should NOT block but should discard.
+            double numDiscarded = 0.0;
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            while (nextShouldDiscard) {
+                handler.nextFrame(frame.getBuffer());
+                numDiscarded++;
+                nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            }
+            // Next Call should block since we're exceeding the discard allowance
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            if (result.isDone()) {
+                Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+            } else {
+                // Check that no records were discarded
+                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                // Check that one frame is spilled
+                assertEquals(handler.getNumSpilled(), 0);
+            }
+            // consume memory frames
+            writer.unfreeze();
+            result.get();
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), discardTestFrames + 1);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = false;
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
+        try {
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next call should NOT block. we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            result.get();
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that one frame is spilled
+            assertEquals(handler.getNumSpilled(), 1);
+            // consume memory frames
+            writer.unfreeze();
+            handler.close();
+            Assert.assertEquals(handler.framesOnDisk(), 0);
+            // exit
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     * Very fast next operator
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(handler.getNumSpilled(), 0);
+            writer.validate(false);
+            handler.close();
+            // Check that nextFrame was called
+            Assert.assertEquals(NUM_FRAMES * numRounds, writer.nextFrameCount());
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     * Slow next operator
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
+        try {
+            int numRounds = 10;
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            writer.setNextDuration(1);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Check that no records were discarded
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            Assert.assertEquals(handler.getNumSpilled(), 0);
+            // Check that nextFrame was called
+            writer.validate(false);
+            handler.close();
+            Assert.assertEquals(writer.nextFrameCount(), (NUM_FRAMES * numRounds));
+            writer.validate(true);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false
+     * Discard = false
+     * VarSizeFrame
+     */
+    public void testMemoryVarSizeFrameNoDiskNoDiscard() {
+        try {
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // we can't satisfy the next request
+            // Next call should block we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            // Check that the nextFrame didn't return
+            if (result.isDone()) {
+                Assert.fail();
+            }
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that no records were spilled
+            assertEquals(handler.getNumSpilled(), 0);
+            // Check that number of stalled is not greater than 1
+            Assert.assertTrue(handler.getNumStalled() <= 1);
+            writer.unfreeze();
+            result.get();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = true;
+     * Discard = false;
+     * Variable size frames
+     */
+    @org.junit.Test
+    public void testMemoryVarSizeFrameWithSpillNoDiscard() {
+        try {
+            Random random = new Random();
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // Spill budget = Memory budget, No discard
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+            int multiplier = 1;
+            // add NUM_FRAMES times
+            while ((multiplier <= framePool.remaining())) {
+                handler.nextFrame(buffer);
+                multiplier = random.nextInt(10) + 1;
+                buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+            }
+            // Next call should Not block. we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+            result.get();
+            // Check that no records were discarded
+            assertEquals(handler.getNumDiscarded(), 0);
+            // Check that one frame is spilled
+            assertEquals(handler.getNumSpilled(), 1);
+            // consume memory frames
+            while (!handler.getInternalBuffer().isEmpty()) {
+                writer.kick();
+            }
+            // There should be 1 frame on disk
+            Assert.assertEquals(1, handler.framesOnDisk());
+            writer.unfreeze();
+            result.get();
+            Assert.assertEquals(0, handler.framesOnDisk());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    /*
+     * Spill = false;
+     * Discard = false;
+     * Fixed size frames
+     */
+    @org.junit.Test
+    public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
+        try {
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            // No spill, No discard
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+            // Non-Active Writer
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            writer.freeze();
+            // FramePool
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+
+            FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+            handler.open();
+            VSizeFrame frame = new VSizeFrame(ctx);
+            // add NUM_FRAMES times
+            for (int i = 0; i < NUM_FRAMES; i++) {
+                handler.nextFrame(frame.getBuffer());
+            }
+            // Next call should block we will do it in a different thread
+            Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+            // Check that the nextFrame didn't return
+            if (result.isDone()) {
+                Assert.fail();
+            } else {
+                // Check that no records were discarded
+                Assert.assertEquals(handler.getNumDiscarded(), 0);
+                // Check that no records were spilled
+                Assert.assertEquals(handler.getNumSpilled(), 0);
+                // Check that no records were discarded
+                // Check that the inputHandler subscribed to the framePool
+                // Check that number of stalled is not greater than 1
+                Assert.assertTrue(handler.getNumStalled() <= 1);
+                writer.kick();
+            }
+            result.get();
+            writer.unfreeze();
+            handler.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            Assert.fail();
+        }
+        Assert.assertNull(cause);
+    }
+
+    private class Pusher implements Runnable {
+        private final ByteBuffer buffer;
+        private final IFrameWriter writer;
+
+        public Pusher(ByteBuffer buffer, IFrameWriter writer) {
+            this.buffer = buffer;
+            this.writer = writer;
+        }
+
+        @Override
+        public void run() {
+            try {
+                writer.nextFrame(buffer);
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+                cause = e;
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 3961921..9336921 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -39,7 +39,23 @@
   <properties>
     <root.dir>${basedir}/../..</root.dir>
   </properties>
-
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
   <dependencies>
     <dependency>
       <groupId>org.json</groupId>
@@ -77,5 +93,22 @@
       <artifactId>hyracks-util</artifactId>
       <version>0.2.18-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>2.0.2-beta</version>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.6.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
new file mode 100644
index 0000000..19998a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowError extends CountAnswer {
+    private String errorMessage;
+
+    public CountAndThrowError(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    @Override
+    public Object call() throws HyracksDataException {
+        count++;
+        throw new UnknownError(errorMessage);
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        throw new UnknownError(errorMessage);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
new file mode 100644
index 0000000..5a5ad59
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowException extends CountAnswer {
+    private String errorMessage;
+
+    public CountAndThrowException(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    @Override
+    public Object call() throws HyracksDataException {
+        count++;
+        throw new HyracksDataException(errorMessage);
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        throw new HyracksDataException(errorMessage);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
new file mode 100644
index 0000000..e8a6654
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class CountAnswer implements Answer<Object> {
+    protected int count = 0;
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        count++;
+        return null;
+    }
+
+    public Object call() throws HyracksDataException {
+        count++;
+        return null;
+    }
+
+    public int getCallCount() {
+        return count;
+    }
+
+    public void reset() {
+        count = 0;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
new file mode 100644
index 0000000..4bddfa9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.util.Collection;
+
+public class FrameWriterTestUtils {
+    public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method ";
+    public static final String ERROR_MESSAGE = "IFrameWriter Error in the call to the method ";
+
+    public enum FrameWriterOperation {
+        Open,
+        NextFrame,
+        Fail,
+        Flush,
+        Close
+    }
+
+    public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations) {
+        CountAnswer openAnswer =
+                createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer nextAnswer =
+                createAnswer(FrameWriterOperation.NextFrame, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer flushAnswer =
+                createAnswer(FrameWriterOperation.Flush, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer failAnswer =
+                createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations);
+        CountAnswer closeAnswer =
+                createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations);
+        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer);
+    }
+
+    public static CountAnswer createAnswer(FrameWriterOperation operation,
+            Collection<FrameWriterOperation> exceptionThrowingOperations,
+            Collection<FrameWriterOperation> errorThrowingOperations) {
+        if (exceptionThrowingOperations.contains(operation)) {
+            return new CountAndThrowException(EXCEPTION_MESSAGE + operation.toString());
+        } else if (exceptionThrowingOperations.contains(operation)) {
+            return new CountAndThrowError(ERROR_MESSAGE + operation.toString());
+        } else {
+            return new CountAnswer();
+        }
+    }
+
+    public static TestControlledFrameWriter create(int initialFrameSize) {
+        return new TestControlledFrameWriter(initialFrameSize);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
new file mode 100644
index 0000000..2a3f70d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestControlledFrameWriter extends TestFrameWriter {
+    private boolean frozen = false;
+    private boolean timed = false;
+    private long duration = Long.MAX_VALUE;
+    private final int initialFrameSize;
+    private volatile int currentMultiplier = 0;
+    private volatile int kicks = 0;
+
+    public TestControlledFrameWriter(int initialFrameSize) {
+        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer());
+        this.initialFrameSize = initialFrameSize;
+    }
+
+    public int getCurrentMultiplier() {
+        return currentMultiplier;
+    }
+
+    public synchronized void freeze() {
+        frozen = true;
+    }
+
+    public synchronized void time(long ms) {
+        frozen = true;
+        timed = true;
+        duration = ms;
+    }
+
+    public synchronized void unfreeze() {
+        frozen = false;
+        notify();
+    }
+
+    public synchronized void kick() {
+        kicks++;
+        notify();
+    }
+
+    @Override
+    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        super.nextFrame(buffer);
+        currentMultiplier = buffer.capacity() / initialFrameSize;
+        if (frozen) {
+            try {
+                if (timed) {
+                    wait(duration);
+                } else {
+                    while (frozen && kicks == 0) {
+                        wait();
+                    }
+                    kicks--;
+                }
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        currentMultiplier = 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
new file mode 100644
index 0000000..b3492fe
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestFrameWriter implements IFrameWriter {
+    private final CountAnswer openAnswer;
+    private final CountAnswer nextAnswer;
+    private final CountAnswer flushAnswer;
+    private final CountAnswer failAnswer;
+    private final CountAnswer closeAnswer;
+    private long openDuration = 0L;
+    private long nextDuration = 0L;
+    private long flushDuration = 0L;
+    private long failDuration = 0L;
+    private long closeDuration = 0L;
+
+    public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer,
+            CountAnswer failAnswer, CountAnswer closeAnswer) {
+        this.openAnswer = openAnswer;
+        this.nextAnswer = nextAnswer;
+        this.closeAnswer = closeAnswer;
+        this.flushAnswer = flushAnswer;
+        this.failAnswer = failAnswer;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        delay(openDuration);
+        openAnswer.call();
+    }
+
+    public int openCount() {
+        return openAnswer.getCallCount();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        delay(nextDuration);
+        nextAnswer.call();
+    }
+
+    public int nextFrameCount() {
+        return nextAnswer.getCallCount();
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        delay(flushDuration);
+        flushAnswer.call();
+    }
+
+    public int flushCount() {
+        return flushAnswer.getCallCount();
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        delay(failDuration);
+        failAnswer.call();
+    }
+
+    public int failCount() {
+        return failAnswer.getCallCount();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        delay(closeDuration);
+        closeAnswer.call();
+    }
+
+    public int closeCount() {
+        return closeAnswer.getCallCount();
+    }
+
+    public synchronized boolean validate(boolean finished) {
+        if (failAnswer.getCallCount() > 1 || closeAnswer.getCallCount() > 1 || openAnswer.getCallCount() > 1) {
+            return false;
+        }
+        if (openAnswer.getCallCount() == 0
+                && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0 || closeAnswer.getCallCount() > 0)) {
+            return false;
+        }
+        if (finished) {
+            if (closeAnswer.getCallCount() == 0 && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0
+                    || openAnswer.getCallCount() > 0)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void reset() {
+        openAnswer.reset();
+        nextAnswer.reset();
+        flushAnswer.reset();
+        failAnswer.reset();
+        closeAnswer.reset();
+    }
+
+    public long getOpenDuration() {
+        return openDuration;
+    }
+
+    public void setOpenDuration(long openDuration) {
+        this.openDuration = openDuration;
+    }
+
+    public long getNextDuration() {
+        return nextDuration;
+    }
+
+    public void setNextDuration(long nextDuration) {
+        this.nextDuration = nextDuration;
+    }
+
+    public long getFlushDuration() {
+        return flushDuration;
+    }
+
+    public void setFlushDuration(long flushDuration) {
+        this.flushDuration = flushDuration;
+    }
+
+    public long getFailDuration() {
+        return failDuration;
+    }
+
+    public void setFailDuration(long failDuration) {
+        this.failDuration = failDuration;
+    }
+
+    public long getCloseDuration() {
+        return closeDuration;
+    }
+
+    public void setCloseDuration(long closeDuration) {
+        this.closeDuration = closeDuration;
+    }
+
+    private void delay(long duration) throws HyracksDataException {
+        if (duration > 0) {
+            try {
+                synchronized (this) {
+                    wait(duration);
+                }
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 581fde4..d07d633 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -39,6 +39,13 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-common</artifactId>
       <version>0.2.18-SNAPSHOT</version>
       <type>jar</type>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index df3a211..d3e7a3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -30,6 +30,9 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAndThrowError;
+import org.apache.hyracks.api.test.CountAndThrowException;
+import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -92,8 +95,8 @@
     public boolean validate(boolean finished) {
         // get number of open calls
         int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
-        int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
-                + nextFrameError.getCallCount();
+        int nextFrameCount =
+                nextFrameException.getCallCount() + nextFrameNormal.getCallCount() + nextFrameError.getCallCount();
         int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
         int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
 
@@ -422,8 +425,9 @@
     public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
         IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
         ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
-        AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
-                * searchOpCallbackFactories.length];
+        AbstractTreeIndexOperatorDescriptor[] opDescs =
+                new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+                        * searchOpCallbackFactories.length];
         int k = 0;
         for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
             for (int j = 0; j < searchOpCallbackFactories.length; j++) {
@@ -452,52 +456,6 @@
         return opCallback;
     }
 
-    public class CountAnswer implements Answer<Object> {
-        protected int count = 0;
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            return null;
-        }
-
-        public int getCallCount() {
-            return count;
-        }
-
-        public void reset() {
-            count = 0;
-        }
-    }
-
-    public class CountAndThrowException extends CountAnswer {
-        private String errorMessage;
-
-        public CountAndThrowException(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            throw new HyracksDataException(errorMessage);
-        }
-    }
-
-    public class CountAndThrowError extends CountAnswer {
-        private String errorMessage;
-
-        public CountAndThrowError(String errorMessage) {
-            this.errorMessage = errorMessage;
-        }
-
-        @Override
-        public Object answer(InvocationOnMock invocation) throws Throwable {
-            count++;
-            throw new UnknownError(errorMessage);
-        }
-    }
-
     public IFrameWriter[] createOutputWriters() throws Exception {
         CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
         CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };