ASTERIXDB-1883: FeedRuntimeInputHandler issues
Recent commit https://asterix-gerrit.ics.uci.edu/#/c/1591/ includes a
number of new issues in FeedRuntimeInputHandler:
- hangs caused by race condition with mutex & inbox on close (observed
on Jenkins)
- CPU spin on disk spilling on empty inbox
- The writer is not flushed in as many cases as before
Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1677
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 329451d..11561b5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.feed.dataflow;
import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -51,7 +52,10 @@
private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
private static final boolean DEBUG = false;
- private final Object mutex = new Object();
+ private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
+ private static final ByteBuffer SPILLED = ByteBuffer.allocate(0);
+ private static final ByteBuffer FAIL = ByteBuffer.allocate(0);
+
private final FeedExceptionHandler exceptionHandler;
private final FrameSpiller spiller;
private final FeedPolicyAccessor fpa;
@@ -59,7 +63,7 @@
private final int initialFrameSize;
private final FrameTransporter consumer;
private final Thread consumerThread;
- private final LinkedBlockingQueue<ByteBuffer> inbox;
+ private final BlockingQueue<ByteBuffer> inbox;
private final ConcurrentFramePool framePool;
private Mode mode = Mode.PROCESS;
private int total = 0;
@@ -90,16 +94,24 @@
@Override
public void open() throws HyracksDataException {
- synchronized (writer) {
- writer.open();
- consumerThread.start();
- }
+ writer.open();
+ consumerThread.start();
}
@Override
public void fail() throws HyracksDataException {
- synchronized (writer) {
- writer.fail();
+ ByteBuffer buffer = inbox.poll();
+ while (buffer != null) {
+ if (buffer != SPILLED) {
+ framePool.release(buffer);
+ }
+ buffer = inbox.poll();
+ }
+ try {
+ inbox.put(FAIL);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "interrupted", e);
+ Thread.currentThread().interrupt();
}
}
@@ -110,28 +122,18 @@
// If we use nextframe, chances are this frame will also be
// flushed into the spilled file. This causes problem when trying to
// read the frame and the size info is lost.
- inbox.put(ByteBuffer.allocate(0));
- synchronized (mutex) {
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
+ inbox.put(POISON_PILL);
consumerThread.join();
} catch (InterruptedException e) {
- LOGGER.log(Level.WARNING, e.getMessage(), e);
- }
- try {
- framePool.release(inbox);
- } catch (Throwable th) {
- LOGGER.log(Level.WARNING, th.getMessage(), th);
+ LOGGER.log(Level.WARNING, "interrupted", e);
+ Thread.currentThread().interrupt();
}
try {
if (spiller != null) {
spiller.close();
}
} catch (Throwable th) {
- LOGGER.log(Level.WARNING, th.getMessage(), th);
+ LOGGER.log(Level.WARNING, "exception closing spiller", th);
} finally {
writer.close();
}
@@ -163,8 +165,11 @@
}
break;
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
} catch (Throwable th) {
- throw new HyracksDataException(th);
+ throw HyracksDataException.create(th);
}
}
@@ -182,7 +187,7 @@
}
}
- private void discard(ByteBuffer frame) throws HyracksDataException {
+ private void discard(ByteBuffer frame) throws HyracksDataException, InterruptedException {
if (DEBUG) {
LOGGER.info("starting discard(frame)");
}
@@ -206,7 +211,7 @@
}
numProcessedInMemory++;
next.put(frame);
- inbox.offer(next);
+ inbox.put(next);
mode = Mode.PROCESS;
return;
}
@@ -224,7 +229,7 @@
}
}
- private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+ private void exitProcessState(ByteBuffer frame) throws HyracksDataException, InterruptedException {
if (fpa.spillToDiskOnCongestion()) {
mode = Mode.SPILL;
spiller.open();
@@ -237,7 +242,7 @@
}
}
- private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
+ private void discardOrStall(ByteBuffer frame) throws HyracksDataException, InterruptedException {
if (fpa.discardOnCongestion()) {
mode = Mode.DISCARD;
discard(frame);
@@ -249,48 +254,33 @@
}
}
- private void stall(ByteBuffer frame) throws HyracksDataException {
- try {
+ private void stall(ByteBuffer frame) throws HyracksDataException, InterruptedException {
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+ }
+ numStalled++;
+ // If spilling is enabled, we wait on the spiller
+ if (fpa.spillToDiskOnCongestion()) {
if (DEBUG) {
- LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+ LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
}
- numStalled++;
- // If spilling is enabled, we wait on the spiller
- if (fpa.spillToDiskOnCongestion()) {
- if (DEBUG) {
- LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
- }
- waitforSpillSpace();
- spiller.spill(frame);
- numSpilled++;
- synchronized (mutex) {
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
- return;
- }
- if (DEBUG) {
- LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
- }
- // Spilling is disabled, we subscribe to feedMemoryManager
- frameAction.setFrame(frame);
- framePool.subscribe(frameAction);
- ByteBuffer temp = frameAction.retrieve();
- inbox.put(temp);
- numProcessedInMemory++;
- if (DEBUG) {
- LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
- }
- synchronized (mutex) {
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ waitforSpillSpace();
+ spiller.spill(frame);
+ numSpilled++;
+ inbox.put(SPILLED);
+ return;
+ }
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+ }
+ // Spilling is disabled, we subscribe to feedMemoryManager
+ frameAction.setFrame(frame);
+ framePool.subscribe(frameAction);
+ ByteBuffer temp = frameAction.retrieve();
+ inbox.put(temp);
+ numProcessedInMemory++;
+ if (DEBUG) {
+ LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
}
}
@@ -307,19 +297,14 @@
}
}
- private void process(ByteBuffer frame) throws HyracksDataException {
+ private void process(ByteBuffer frame) throws HyracksDataException, InterruptedException {
// Get a page from frame pool
ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
if (next != null) {
// Got a page from memory pool
numProcessedInMemory++;
next.put(frame);
- try {
- inbox.put(next);
- notifyMemoryConsumer();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
+ inbox.put(next);
} else {
if (DEBUG) {
LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
@@ -329,46 +314,29 @@
}
}
- private void notifyMemoryConsumer() {
- if (inbox.size() == 1) {
- synchronized (mutex) {
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
- }
- }
-
- private void spill(ByteBuffer frame) throws HyracksDataException {
+ private void spill(ByteBuffer frame) throws HyracksDataException, InterruptedException {
if (spiller.switchToMemory()) {
- synchronized (mutex) {
- // Check if there is memory
- ByteBuffer next = null;
- if (frame.capacity() <= framePool.getMaxFrameSize()) {
- next = getFreeBuffer(frame.capacity());
- }
- if (next != null) {
- spiller.close();
- numProcessedInMemory++;
- next.put(frame);
- inbox.offer(next);
- notifyMemoryConsumer();
- mode = Mode.PROCESS;
- } else {
- // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
- spiller.spill(frame);
- numSpilled++;
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
+ // Check if there is memory
+ ByteBuffer next = null;
+ if (frame.capacity() <= framePool.getMaxFrameSize()) {
+ next = getFreeBuffer(frame.capacity());
+ }
+ if (next != null) {
+ spiller.close();
+ numProcessedInMemory++;
+ next.put(frame);
+ inbox.put(next);
+ mode = Mode.PROCESS;
+ } else {
+ // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
+ spiller.spill(frame);
+ numSpilled++;
+ inbox.put(SPILLED);
}
} else {
// try to spill. If failed switch to either discard or stall
if (spiller.spill(frame)) {
- notifyDiskConsumer();
+ inbox.put(SPILLED);
numSpilled++;
} else {
if (fpa.discardOnCongestion()) {
@@ -381,22 +349,6 @@
}
}
- private void notifyDiskConsumer() {
- if (spiller.remaining() == 1) {
- synchronized (mutex) {
- if (DEBUG) {
- LOGGER.info("Producer is waking up consumer");
- }
- mutex.notify();
- }
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- // no op
- }
-
public int getNumDiscarded() {
return numDiscarded;
}
@@ -460,37 +412,28 @@
boolean running = true;
while (running) {
frame = inbox.poll();
-
- if (frame == null && spiller != null) {
- running = clearLocalFrames();
- continue;
- }
-
if (frame == null) {
- synchronized (mutex) {
- LOGGER.info("Consumer is going to sleep");
- mutex.wait();
- LOGGER.info("Consumer is waking up");
- }
- continue;
+ writer.flush();
+ frame = inbox.take();
}
-
- // process
- if (frame.capacity() == 0) {
+ if (frame == SPILLED) {
+ running = clearLocalFrames();
+ } else if (frame == POISON_PILL) {
running = false;
- if (spiller != null ) {
+ if (spiller != null) {
clearLocalFrames();
}
+ } else if (frame == FAIL) {
+ running = false;
+ writer.fail();
} else {
+ // process
try {
- if (consume(frame) != null) {
- return;
- }
+ running = consume(frame) == null;
} finally {
framePool.release(frame);
}
}
- writer.flush();
}
} catch (Throwable th) {
this.cause = th;
@@ -507,7 +450,7 @@
return total;
}
- public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() {
+ public BlockingQueue<ByteBuffer> getInternalBuffer() {
return inbox;
}
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 2237505..b6343b9 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -48,14 +48,13 @@
import org.apache.hyracks.api.test.TestFrameWriter;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.test.support.TestUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.mockito.Mockito;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-public class InputHandlerTest extends TestCase {
+public class InputHandlerTest {
private static final int DEFAULT_FRAME_SIZE = 32768;
private static final int NUM_FRAMES = 128;
@@ -68,14 +67,6 @@
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
private volatile static HyracksDataException cause = null;
- public InputHandlerTest(String testName) {
- super(testName);
- }
-
- public static Test suite() {
- return new TestSuite(InputHandlerTest.class);
- }
-
private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
@@ -122,12 +113,12 @@
}
}
- @org.junit.Before
+ @Before
public void testCleanBefore() throws IOException {
cleanDiskFiles();
}
- @org.junit.After
+ @After
public void testCleanAfter() throws IOException {
cleanDiskFiles();
}
@@ -139,11 +130,11 @@
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
- false);
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -175,17 +166,17 @@
}
}
- @org.junit.Test
+ @Test
public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
try {
int numRounds = 10;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// No spill, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
- false);
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -213,7 +204,6 @@
} finally {
Assert.assertNull(cause);
}
-
}
/*
@@ -221,7 +211,7 @@
* Discard = true; discard only 5%
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryVarSizeFrameWithSpillWithDiscard() {
try {
int numberOfMemoryFrames = 50;
@@ -230,14 +220,14 @@
int totalMinFrames = 0;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
- ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
@@ -300,8 +290,8 @@
Assert.assertEquals(0, handler.getNumDiscarded());
// We can only discard one frame
double numDiscarded = 0;
- boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
- .getMaxFractionDiscard();
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(buffer5);
numDiscarded++;
@@ -333,21 +323,21 @@
* Discard = true
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
try {
int numberOfMemoryFrames = 50;
int numberOfSpillFrames = 50;
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
- ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
@@ -370,8 +360,8 @@
Assert.assertEquals(0, handler.getNumDiscarded());
// We can only discard one frame
double numDiscarded = 0;
- boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
- .getMaxFractionDiscard();
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(frame.getBuffer());
numDiscarded++;
@@ -407,7 +397,7 @@
* Discard = true; discard only 5%
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
try {
int discardTestFrames = 100;
@@ -419,8 +409,8 @@
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
- ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
// add NUM_FRAMES times
@@ -436,8 +426,8 @@
}
// Next call should NOT block but should discard.
double numDiscarded = 0.0;
- boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
- .getMaxFractionDiscard();
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(buffer);
numDiscarded++;
@@ -448,9 +438,9 @@
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
} else {
// Check that no records were discarded
- assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Check that one frame is spilled
- assertEquals(handler.getNumSpilled(), 0);
+ Assert.assertEquals(handler.getNumSpilled(), 0);
}
// consume memory frames
writer.unfreeze();
@@ -470,7 +460,7 @@
* Discard = true; discard only 5%
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
try {
int discardTestFrames = 100;
@@ -481,8 +471,8 @@
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
// FramePool
- ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
- DEFAULT_FRAME_SIZE);
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
handler.open();
VSizeFrame frame = new VSizeFrame(ctx);
@@ -492,8 +482,8 @@
}
// Next 5 calls call should NOT block but should discard.
double numDiscarded = 0.0;
- boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
- .getMaxFractionDiscard();
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
while (nextShouldDiscard) {
handler.nextFrame(frame.getBuffer());
numDiscarded++;
@@ -505,9 +495,9 @@
Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
} else {
// Check that no records were discarded
- assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
// Check that one frame is spilled
- assertEquals(handler.getNumSpilled(), 0);
+ Assert.assertEquals(handler.getNumSpilled(), 0);
}
// consume memory frames
writer.unfreeze();
@@ -527,13 +517,13 @@
* Discard = false;
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
try {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
@@ -550,9 +540,9 @@
Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
result.get();
// Check that no records were discarded
- assertEquals(handler.getNumDiscarded(), 0);
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that one frame is spilled
- assertEquals(handler.getNumSpilled(), 1);
+ Assert.assertEquals(handler.getNumSpilled(), 1);
// consume memory frames
writer.unfreeze();
handler.close();
@@ -571,7 +561,7 @@
* Fixed size frames
* Very fast next operator
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
try {
int numRounds = 10;
@@ -579,8 +569,8 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
- false);
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -612,7 +602,7 @@
* Fixed size frames
* Slow next operator
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
try {
int numRounds = 10;
@@ -620,8 +610,8 @@
// No spill, No discard
FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
// Non-Active Writer
- TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
- false);
+ TestFrameWriter writer =
+ FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
// FramePool
ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -653,6 +643,7 @@
* Discard = false
* VarSizeFrame
*/
+ @Test
public void testMemoryVarSizeFrameNoDiskNoDiscard() {
try {
Random random = new Random();
@@ -682,12 +673,13 @@
Assert.fail();
}
// Check that no records were discarded
- assertEquals(handler.getNumDiscarded(), 0);
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that no records were spilled
- assertEquals(handler.getNumSpilled(), 0);
+ Assert.assertEquals(handler.getNumSpilled(), 0);
// Check that number of stalled is not greater than 1
Assert.assertTrue(handler.getNumStalled() <= 1);
writer.unfreeze();
+ handler.close();
result.get();
} catch (Throwable th) {
th.printStackTrace();
@@ -701,15 +693,15 @@
* Discard = false;
* Variable size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryVarSizeFrameWithSpillNoDiscard() {
for (int k = 0; k < 1000; k++) {
try {
Random random = new Random();
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
// Spill budget = Memory budget, No discard
- FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
- DISCARD_ALLOWANCE);
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
// Non-Active Writer
TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
writer.freeze();
@@ -719,8 +711,10 @@
handler.open();
ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
int multiplier = 1;
+ int numOfBuffersInMemory = 0;
// add NUM_FRAMES times
while ((multiplier <= framePool.remaining())) {
+ numOfBuffersInMemory++;
handler.nextFrame(buffer);
multiplier = random.nextInt(10) + 1;
buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
@@ -729,12 +723,11 @@
Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
result.get();
// Check that no records were discarded
- assertEquals(handler.getNumDiscarded(), 0);
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
// Check that one frame is spilled
- assertEquals(handler.getNumSpilled(), 1);
- int numOfBuffersInMemory = handler.getInternalBuffer().size();
+ Assert.assertEquals(handler.getNumSpilled(), 1);
// consume memory frames
- while (numOfBuffersInMemory > 0) {
+ while (numOfBuffersInMemory > 1) {
writer.kick();
numOfBuffersInMemory--;
}
@@ -756,7 +749,7 @@
* Discard = false;
* Fixed size frames
*/
- @org.junit.Test
+ @Test
public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
try {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);