ASTERIXDB-1883: FeedRuntimeInputHandler issues

Recent commit https://asterix-gerrit.ics.uci.edu/#/c/1591/ includes a
number of new issues in FeedRuntimeInputHandler:
- hangs caused by race condition with mutex & inbox on close (observed
  on Jenkins)
- CPU spin on disk spilling on empty inbox
- The writer is not flushed in as many cases as before

Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1677
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 329451d..11561b5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.feed.dataflow;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -51,7 +52,10 @@
     private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
     private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
     private static final boolean DEBUG = false;
-    private final Object mutex = new Object();
+    private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
+    private static final ByteBuffer SPILLED = ByteBuffer.allocate(0);
+    private static final ByteBuffer FAIL = ByteBuffer.allocate(0);
+
     private final FeedExceptionHandler exceptionHandler;
     private final FrameSpiller spiller;
     private final FeedPolicyAccessor fpa;
@@ -59,7 +63,7 @@
     private final int initialFrameSize;
     private final FrameTransporter consumer;
     private final Thread consumerThread;
-    private final LinkedBlockingQueue<ByteBuffer> inbox;
+    private final BlockingQueue<ByteBuffer> inbox;
     private final ConcurrentFramePool framePool;
     private Mode mode = Mode.PROCESS;
     private int total = 0;
@@ -90,16 +94,24 @@
 
     @Override
     public void open() throws HyracksDataException {
-        synchronized (writer) {
-            writer.open();
-            consumerThread.start();
-        }
+        writer.open();
+        consumerThread.start();
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        synchronized (writer) {
-            writer.fail();
+        ByteBuffer buffer = inbox.poll();
+        while (buffer != null) {
+            if (buffer != SPILLED) {
+                framePool.release(buffer);
+            }
+            buffer = inbox.poll();
+        }
+        try {
+            inbox.put(FAIL);
+        } catch (InterruptedException e) {
+            LOGGER.log(Level.WARNING, "interrupted", e);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -110,28 +122,18 @@
             // If we use nextframe, chances are this frame will also be
             // flushed into the spilled file. This causes problem when trying to
             // read the frame and the size info is lost.
-            inbox.put(ByteBuffer.allocate(0));
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
+            inbox.put(POISON_PILL);
             consumerThread.join();
         } catch (InterruptedException e) {
-            LOGGER.log(Level.WARNING, e.getMessage(), e);
-        }
-        try {
-            framePool.release(inbox);
-        } catch (Throwable th) {
-            LOGGER.log(Level.WARNING, th.getMessage(), th);
+            LOGGER.log(Level.WARNING, "interrupted", e);
+            Thread.currentThread().interrupt();
         }
         try {
             if (spiller != null) {
                 spiller.close();
             }
         } catch (Throwable th) {
-            LOGGER.log(Level.WARNING, th.getMessage(), th);
+            LOGGER.log(Level.WARNING, "exception closing spiller", th);
         } finally {
             writer.close();
         }
@@ -163,8 +165,11 @@
                     }
                     break;
             }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw HyracksDataException.create(e);
         } catch (Throwable th) {
-            throw new HyracksDataException(th);
+            throw HyracksDataException.create(th);
         }
     }
 
@@ -182,7 +187,7 @@
         }
     }
 
-    private void discard(ByteBuffer frame) throws HyracksDataException {
+    private void discard(ByteBuffer frame) throws HyracksDataException, InterruptedException {
         if (DEBUG) {
             LOGGER.info("starting discard(frame)");
         }
@@ -206,7 +211,7 @@
                 }
                 numProcessedInMemory++;
                 next.put(frame);
-                inbox.offer(next);
+                inbox.put(next);
                 mode = Mode.PROCESS;
                 return;
             }
@@ -224,7 +229,7 @@
         }
     }
 
-    private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+    private void exitProcessState(ByteBuffer frame) throws HyracksDataException, InterruptedException {
         if (fpa.spillToDiskOnCongestion()) {
             mode = Mode.SPILL;
             spiller.open();
@@ -237,7 +242,7 @@
         }
     }
 
-    private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
+    private void discardOrStall(ByteBuffer frame) throws HyracksDataException, InterruptedException {
         if (fpa.discardOnCongestion()) {
             mode = Mode.DISCARD;
             discard(frame);
@@ -249,48 +254,33 @@
         }
     }
 
-    private void stall(ByteBuffer frame) throws HyracksDataException {
-        try {
+    private void stall(ByteBuffer frame) throws HyracksDataException, InterruptedException {
+        if (DEBUG) {
+            LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+        }
+        numStalled++;
+        // If spilling is enabled, we wait on the spiller
+        if (fpa.spillToDiskOnCongestion()) {
             if (DEBUG) {
-                LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+                LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
             }
-            numStalled++;
-            // If spilling is enabled, we wait on the spiller
-            if (fpa.spillToDiskOnCongestion()) {
-                if (DEBUG) {
-                    LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
-                }
-                waitforSpillSpace();
-                spiller.spill(frame);
-                numSpilled++;
-                synchronized (mutex) {
-                    if (DEBUG) {
-                        LOGGER.info("Producer is waking up consumer");
-                    }
-                    mutex.notify();
-                }
-                return;
-            }
-            if (DEBUG) {
-                LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
-            }
-            // Spilling is disabled, we subscribe to feedMemoryManager
-            frameAction.setFrame(frame);
-            framePool.subscribe(frameAction);
-            ByteBuffer temp = frameAction.retrieve();
-            inbox.put(temp);
-            numProcessedInMemory++;
-            if (DEBUG) {
-                LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
-            }
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
+            waitforSpillSpace();
+            spiller.spill(frame);
+            numSpilled++;
+            inbox.put(SPILLED);
+            return;
+        }
+        if (DEBUG) {
+            LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+        }
+        // Spilling is disabled, we subscribe to feedMemoryManager
+        frameAction.setFrame(frame);
+        framePool.subscribe(frameAction);
+        ByteBuffer temp = frameAction.retrieve();
+        inbox.put(temp);
+        numProcessedInMemory++;
+        if (DEBUG) {
+            LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
         }
     }
 
@@ -307,19 +297,14 @@
         }
     }
 
-    private void process(ByteBuffer frame) throws HyracksDataException {
+    private void process(ByteBuffer frame) throws HyracksDataException, InterruptedException {
         // Get a page from frame pool
         ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
         if (next != null) {
             // Got a page from memory pool
             numProcessedInMemory++;
             next.put(frame);
-            try {
-                inbox.put(next);
-                notifyMemoryConsumer();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
+            inbox.put(next);
         } else {
             if (DEBUG) {
                 LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
@@ -329,46 +314,29 @@
         }
     }
 
-    private void notifyMemoryConsumer() {
-        if (inbox.size() == 1) {
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
-        }
-    }
-
-    private void spill(ByteBuffer frame) throws HyracksDataException {
+    private void spill(ByteBuffer frame) throws HyracksDataException, InterruptedException {
         if (spiller.switchToMemory()) {
-            synchronized (mutex) {
-                // Check if there is memory
-                ByteBuffer next = null;
-                if (frame.capacity() <= framePool.getMaxFrameSize()) {
-                    next = getFreeBuffer(frame.capacity());
-                }
-                if (next != null) {
-                    spiller.close();
-                    numProcessedInMemory++;
-                    next.put(frame);
-                    inbox.offer(next);
-                    notifyMemoryConsumer();
-                    mode = Mode.PROCESS;
-                } else {
-                    // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
-                    spiller.spill(frame);
-                    numSpilled++;
-                    if (DEBUG) {
-                        LOGGER.info("Producer is waking up consumer");
-                    }
-                    mutex.notify();
-                }
+            // Check if there is memory
+            ByteBuffer next = null;
+            if (frame.capacity() <= framePool.getMaxFrameSize()) {
+                next = getFreeBuffer(frame.capacity());
+            }
+            if (next != null) {
+                spiller.close();
+                numProcessedInMemory++;
+                next.put(frame);
+                inbox.put(next);
+                mode = Mode.PROCESS;
+            } else {
+                // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
+                spiller.spill(frame);
+                numSpilled++;
+                inbox.put(SPILLED);
             }
         } else {
             // try to spill. If failed switch to either discard or stall
             if (spiller.spill(frame)) {
-                notifyDiskConsumer();
+                inbox.put(SPILLED);
                 numSpilled++;
             } else {
                 if (fpa.discardOnCongestion()) {
@@ -381,22 +349,6 @@
         }
     }
 
-    private void notifyDiskConsumer() {
-        if (spiller.remaining() == 1) {
-            synchronized (mutex) {
-                if (DEBUG) {
-                    LOGGER.info("Producer is waking up consumer");
-                }
-                mutex.notify();
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        // no op
-    }
-
     public int getNumDiscarded() {
         return numDiscarded;
     }
@@ -460,37 +412,28 @@
                 boolean running = true;
                 while (running) {
                     frame = inbox.poll();
-
-                    if (frame == null && spiller != null) {
-                        running = clearLocalFrames();
-                        continue;
-                    }
-
                     if (frame == null) {
-                        synchronized (mutex) {
-                            LOGGER.info("Consumer is going to sleep");
-                            mutex.wait();
-                            LOGGER.info("Consumer is waking up");
-                        }
-                        continue;
+                        writer.flush();
+                        frame = inbox.take();
                     }
-
-                    // process
-                    if (frame.capacity() == 0) {
+                    if (frame == SPILLED) {
+                        running = clearLocalFrames();
+                    } else if (frame == POISON_PILL) {
                         running = false;
-                        if (spiller != null ) {
+                        if (spiller != null) {
                             clearLocalFrames();
                         }
+                    } else if (frame == FAIL) {
+                        running = false;
+                        writer.fail();
                     } else {
+                        // process
                         try {
-                            if (consume(frame) != null) {
-                                return;
-                            }
+                            running = consume(frame) == null;
                         } finally {
                             framePool.release(frame);
                         }
                     }
-                    writer.flush();
                 }
             } catch (Throwable th) {
                 this.cause = th;
@@ -507,7 +450,7 @@
         return total;
     }
 
-    public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() {
+    public BlockingQueue<ByteBuffer> getInternalBuffer() {
         return inbox;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 2237505..b6343b9 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -48,14 +48,13 @@
 import org.apache.hyracks.api.test.TestFrameWriter;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.mockito.Mockito;
 
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-public class InputHandlerTest extends TestCase {
+public class InputHandlerTest {
 
     private static final int DEFAULT_FRAME_SIZE = 32768;
     private static final int NUM_FRAMES = 128;
@@ -68,14 +67,6 @@
     private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
     private volatile static HyracksDataException cause = null;
 
-    public InputHandlerTest(String testName) {
-        super(testName);
-    }
-
-    public static Test suite() {
-        return new TestSuite(InputHandlerTest.class);
-    }
-
     private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
             FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
         FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
@@ -122,12 +113,12 @@
         }
     }
 
-    @org.junit.Before
+    @Before
     public void testCleanBefore() throws IOException {
         cleanDiskFiles();
     }
 
-    @org.junit.After
+    @After
     public void testCleanAfter() throws IOException {
         cleanDiskFiles();
     }
@@ -139,11 +130,11 @@
             Random random = new Random();
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
-                    DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
-                    false);
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -175,17 +166,17 @@
         }
     }
 
-    @org.junit.Test
+    @Test
     public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
         try {
             int numRounds = 10;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
-                    DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
-                    false);
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -213,7 +204,6 @@
         } finally {
             Assert.assertNull(cause);
         }
-
     }
 
     /*
@@ -221,7 +211,7 @@
      * Discard = true; discard only 5%
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryVarSizeFrameWithSpillWithDiscard() {
         try {
             int numberOfMemoryFrames = 50;
@@ -230,14 +220,14 @@
             int totalMinFrames = 0;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
-                    DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
@@ -300,8 +290,8 @@
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
-                    .getMaxFractionDiscard();
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer5);
                 numDiscarded++;
@@ -333,21 +323,21 @@
      * Discard = true
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
         try {
             int numberOfMemoryFrames = 50;
             int numberOfSpillFrames = 50;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
-                    DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -370,8 +360,8 @@
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
-                    .getMaxFractionDiscard();
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -407,7 +397,7 @@
      * Discard = true; discard only 5%
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
         try {
             int discardTestFrames = 100;
@@ -419,8 +409,8 @@
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             // add NUM_FRAMES times
@@ -436,8 +426,8 @@
             }
             // Next call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
-                    .getMaxFractionDiscard();
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer);
                 numDiscarded++;
@@ -448,9 +438,9 @@
                 Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
             } else {
                 // Check that no records were discarded
-                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
                 // Check that one frame is spilled
-                assertEquals(handler.getNumSpilled(), 0);
+                Assert.assertEquals(handler.getNumSpilled(), 0);
             }
             // consume memory frames
             writer.unfreeze();
@@ -470,7 +460,7 @@
      * Discard = true; discard only 5%
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
         try {
             int discardTestFrames = 100;
@@ -481,8 +471,8 @@
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
-                    DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool =
+                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -492,8 +482,8 @@
             }
             // Next 5 calls call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
-                    .getMaxFractionDiscard();
+            boolean nextShouldDiscard =
+                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -505,9 +495,9 @@
                 Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
             } else {
                 // Check that no records were discarded
-                assertEquals((int) numDiscarded, handler.getNumDiscarded());
+                Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
                 // Check that one frame is spilled
-                assertEquals(handler.getNumSpilled(), 0);
+                Assert.assertEquals(handler.getNumSpilled(), 0);
             }
             // consume memory frames
             writer.unfreeze();
@@ -527,13 +517,13 @@
      * Discard = false;
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
         try {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
-                    DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa =
+                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
@@ -550,9 +540,9 @@
             Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
             result.get();
             // Check that no records were discarded
-            assertEquals(handler.getNumDiscarded(), 0);
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
             // Check that one frame is spilled
-            assertEquals(handler.getNumSpilled(), 1);
+            Assert.assertEquals(handler.getNumSpilled(), 1);
             // consume memory frames
             writer.unfreeze();
             handler.close();
@@ -571,7 +561,7 @@
      * Fixed size frames
      * Very fast next operator
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
         try {
             int numRounds = 10;
@@ -579,8 +569,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
-                    false);
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -612,7 +602,7 @@
      * Fixed size frames
      * Slow next operator
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
         try {
             int numRounds = 10;
@@ -620,8 +610,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
-                    false);
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -653,6 +643,7 @@
      * Discard = false
      * VarSizeFrame
      */
+    @Test
     public void testMemoryVarSizeFrameNoDiskNoDiscard() {
         try {
             Random random = new Random();
@@ -682,12 +673,13 @@
                 Assert.fail();
             }
             // Check that no records were discarded
-            assertEquals(handler.getNumDiscarded(), 0);
+            Assert.assertEquals(handler.getNumDiscarded(), 0);
             // Check that no records were spilled
-            assertEquals(handler.getNumSpilled(), 0);
+            Assert.assertEquals(handler.getNumSpilled(), 0);
             // Check that number of stalled is not greater than 1
             Assert.assertTrue(handler.getNumStalled() <= 1);
             writer.unfreeze();
+            handler.close();
             result.get();
         } catch (Throwable th) {
             th.printStackTrace();
@@ -701,15 +693,15 @@
      * Discard = false;
      * Variable size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryVarSizeFrameWithSpillNoDiscard() {
         for (int k = 0; k < 1000; k++) {
             try {
                 Random random = new Random();
                 IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
                 // Spill budget = Memory budget, No discard
-                FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
-                        DISCARD_ALLOWANCE);
+                FeedPolicyAccessor fpa =
+                        createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
                 // Non-Active Writer
                 TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
                 writer.freeze();
@@ -719,8 +711,10 @@
                 handler.open();
                 ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
                 int multiplier = 1;
+                int numOfBuffersInMemory = 0;
                 // add NUM_FRAMES times
                 while ((multiplier <= framePool.remaining())) {
+                    numOfBuffersInMemory++;
                     handler.nextFrame(buffer);
                     multiplier = random.nextInt(10) + 1;
                     buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
@@ -729,12 +723,11 @@
                 Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
                 result.get();
                 // Check that no records were discarded
-                assertEquals(handler.getNumDiscarded(), 0);
+                Assert.assertEquals(handler.getNumDiscarded(), 0);
                 // Check that one frame is spilled
-                assertEquals(handler.getNumSpilled(), 1);
-                int numOfBuffersInMemory = handler.getInternalBuffer().size();
+                Assert.assertEquals(handler.getNumSpilled(), 1);
                 // consume memory frames
-                while (numOfBuffersInMemory > 0) {
+                while (numOfBuffersInMemory > 1) {
                     writer.kick();
                     numOfBuffersInMemory--;
                 }
@@ -756,7 +749,7 @@
      * Discard = false;
      * Fixed size frames
      */
-    @org.junit.Test
+    @Test
     public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
         try {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);