Add Unit Tests for Feed Runtime Input Handler
Change-Id: I7088f489a7d53dee8cf6cdbf5baa7cd8d3884f55
Reviewed-on: https://asterix-gerrit.ics.uci.edu/866
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3920a03..d201a6a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -35,7 +35,7 @@
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
/**
- * TODO: Add unit test cases for this class
+ * TODO: Add Failure cases unit tests for this class
* Provides for error-handling and input-side buffering for a feed runtime.
* .............______.............
* ............|......|............
@@ -48,8 +48,9 @@
public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName());
- private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0);
private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
+ private static final boolean DEBUG = false;
+ private final Object mutex = new Object();
private final FeedExceptionHandler exceptionHandler;
private final FrameSpiller spiller;
private final FeedPolicyAccessor fpa;
@@ -58,15 +59,16 @@
private final FrameTransporter consumer;
private final Thread consumerThread;
private final LinkedBlockingDeque<ByteBuffer> inbox;
- private final ConcurrentFramePool memoryManager;
+ private final ConcurrentFramePool framePool;
private Mode mode = Mode.PROCESS;
+ private int total = 0;
private int numDiscarded = 0;
private int numSpilled = 0;
private int numProcessedInMemory = 0;
private int numStalled = 0;
public FeedRuntimeInputHandler(IHyracksTaskContext ctx, FeedConnectionId connectionId, FeedRuntimeId runtimeId,
- IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool feedMemoryManager)
+ IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
throws HyracksDataException {
this.writer = writer;
this.spiller =
@@ -76,13 +78,13 @@
fpa.getMaxSpillOnDisk());
this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
this.fpa = fpa;
- this.memoryManager = feedMemoryManager;
+ this.framePool = framePool;
this.inbox = new LinkedBlockingDeque<>();
this.consumer = new FrameTransporter();
- this.consumerThread = new Thread();
+ this.consumerThread = new Thread(consumer);
this.consumerThread.start();
this.initialFrameSize = ctx.getInitialFrameSize();
- this.frameAction = new FrameAction(inbox);
+ this.frameAction = new FrameAction();
}
@Override
@@ -101,15 +103,20 @@
@Override
public void close() throws HyracksDataException {
- inbox.add(POISON_PILL);
- notify();
+ consumer.poison();
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
try {
consumerThread.join();
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
}
try {
- memoryManager.release(inbox);
+ framePool.release(inbox);
} catch (Throwable th) {
LOGGER.log(Level.WARNING, th.getMessage(), th);
}
@@ -124,9 +131,13 @@
@Override
public void nextFrame(ByteBuffer frame) throws HyracksDataException {
try {
+ total++;
if (consumer.cause() != null) {
throw consumer.cause();
}
+ if (DEBUG) {
+ LOGGER.info("nextFrame() called. inputHandler is in mode: " + mode.toString());
+ }
switch (mode) {
case PROCESS:
process(frame);
@@ -148,25 +159,42 @@
}
}
+ // For unit testing purposes
+ public int framesOnDisk() {
+ return spiller.remaining();
+ }
+
private ByteBuffer getFreeBuffer(int frameSize) throws HyracksDataException {
int numFrames = frameSize / initialFrameSize;
if (numFrames == 1) {
- return memoryManager.get();
+ return framePool.get();
} else {
- return memoryManager.get(frameSize);
+ return framePool.get(frameSize);
}
}
private void discard(ByteBuffer frame) throws HyracksDataException {
+ if (DEBUG) {
+ LOGGER.info("starting discard(frame)");
+ }
if (fpa.spillToDiskOnCongestion()) {
+ if (DEBUG) {
+ LOGGER.info("Spilling to disk is enabled. Will try that");
+ }
if (spiller.spill(frame)) {
numSpilled++;
mode = Mode.SPILL;
return;
}
} else {
+ if (DEBUG) {
+ LOGGER.info("Spilling to disk is disabled. Will try to get a buffer");
+ }
ByteBuffer next = getFreeBuffer(frame.capacity());
if (next != null) {
+ if (DEBUG) {
+ LOGGER.info("Was able to get a buffer");
+ }
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
@@ -174,102 +202,168 @@
return;
}
}
- numDiscarded++;
+ if (((numDiscarded + 1.0) / total) > fpa.getMaxFractionDiscard()) {
+ if (DEBUG) {
+ LOGGER.info("in discard(frame). Discard allowance has been consumed. --> Stalling");
+ }
+ stall(frame);
+ } else {
+ if (DEBUG) {
+ LOGGER.info("in discard(frame). So far, I have discarded " + numDiscarded);
+ }
+ numDiscarded++;
+ }
}
- private synchronized void exitProcessState(ByteBuffer frame) throws HyracksDataException {
+ private void exitProcessState(ByteBuffer frame) throws HyracksDataException {
if (fpa.spillToDiskOnCongestion()) {
mode = Mode.SPILL;
spiller.open();
spill(frame);
} else {
+ if (DEBUG) {
+ LOGGER.info("Spilling is disabled --> discardOrStall(frame)");
+ }
discardOrStall(frame);
}
}
private void discardOrStall(ByteBuffer frame) throws HyracksDataException {
if (fpa.discardOnCongestion()) {
- numDiscarded++;
mode = Mode.DISCARD;
discard(frame);
} else {
+ if (DEBUG) {
+ LOGGER.info("Discard is disabled --> stall(frame)");
+ }
stall(frame);
}
}
private void stall(ByteBuffer frame) throws HyracksDataException {
try {
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). So far, I have stalled " + numStalled);
+ }
numStalled++;
// If spilling is enabled, we wait on the spiller
if (fpa.spillToDiskOnCongestion()) {
- synchronized (spiller) {
- while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
- spiller.wait();
- }
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill");
}
+ waitforSpillSpace();
spiller.spill(frame);
- synchronized (this) {
- notify();
+ numSpilled++;
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
return;
}
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool");
+ }
// Spilling is disabled, we subscribe to feedMemoryManager
frameAction.setFrame(frame);
- synchronized (frameAction) {
- if (memoryManager.subscribe(frameAction)) {
- frameAction.wait();
- }
+ framePool.subscribe(frameAction);
+ ByteBuffer temp = frameAction.retrieve();
+ inbox.put(temp);
+ numProcessedInMemory++;
+ if (DEBUG) {
+ LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready");
}
- synchronized (this) {
- notify();
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
+ private void waitforSpillSpace() throws InterruptedException {
+ synchronized (spiller) {
+ while (spiller.usedBudget() > MAX_SPILL_USED_BEFORE_RESUME) {
+ if (DEBUG) {
+ LOGGER.info("in stall(frame). Spilling has been consumed. We will wait for it to be less than "
+ + MAX_SPILL_USED_BEFORE_RESUME + " consumed. Current consumption = "
+ + spiller.usedBudget());
+ }
+ spiller.wait();
+ }
+ }
+ }
+
private void process(ByteBuffer frame) throws HyracksDataException {
- // Get a page from
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ // Get a page from frame pool
+ ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null;
if (next != null) {
+ // Got a page from memory pool
numProcessedInMemory++;
next.put(frame);
- inbox.offer(next);
- if (inbox.size() == 1) {
- synchronized (this) {
- notify();
- }
+ try {
+ inbox.put(next);
+ notifyMemoryConsumer();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
} else {
- // out of memory. we switch to next mode as per policy -- synchronized method
+ if (DEBUG) {
+ LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)");
+ }
+ // Out of memory. we switch to next mode as per policy
exitProcessState(frame);
}
}
+ private void notifyMemoryConsumer() {
+ if (inbox.size() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
+ }
+ }
+
private void spill(ByteBuffer frame) throws HyracksDataException {
if (spiller.switchToMemory()) {
- synchronized (this) {
+ synchronized (mutex) {
// Check if there is memory
- ByteBuffer next = getFreeBuffer(frame.capacity());
+ ByteBuffer next = null;
+ if (frame.capacity() <= framePool.getMaxFrameSize()) {
+ next = getFreeBuffer(frame.capacity());
+ }
if (next != null) {
spiller.close();
numProcessedInMemory++;
next.put(frame);
inbox.offer(next);
+ notifyMemoryConsumer();
mode = Mode.PROCESS;
} else {
- // spill. This will always succeed since spilled = 0 (must verify that budget can't be 0)
+ // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0)
spiller.spill(frame);
numSpilled++;
- notify();
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
}
}
} else {
// try to spill. If failed switch to either discard or stall
if (spiller.spill(frame)) {
+ notifyDiskConsumer();
numSpilled++;
} else {
if (fpa.discardOnCongestion()) {
+ mode = Mode.DISCARD;
discard(frame);
} else {
stall(frame);
@@ -278,11 +372,22 @@
}
}
+ private void notifyDiskConsumer() {
+ if (spiller.remaining() == 1) {
+ synchronized (mutex) {
+ if (DEBUG) {
+ LOGGER.info("Producer is waking up consumer");
+ }
+ mutex.notify();
+ }
+ }
+ }
+
public Mode getMode() {
return mode;
}
- public synchronized void setMode(Mode mode) {
+ public void setMode(Mode mode) {
this.mode = mode;
}
@@ -311,15 +416,22 @@
private class FrameTransporter implements Runnable {
private volatile Throwable cause;
+ private int consumed = 0;
+ private boolean poisoned = false;
public Throwable cause() {
return cause;
}
+ public void poison() {
+ poisoned = true;
+ }
+
private Throwable consume(ByteBuffer frame) {
while (frame != null) {
try {
writer.nextFrame(frame);
+ consumed++;
frame = null;
} catch (HyracksDataException e) {
// It is fine to catch throwable here since this thread is always expected to terminate gracefully
@@ -340,7 +452,7 @@
public void run() {
try {
ByteBuffer frame = inbox.poll();
- while (frame != POISON_PILL) {
+ while (true) {
if (frame != null) {
try {
if (consume(frame) != null) {
@@ -348,7 +460,7 @@
}
} finally {
// Done with frame.
- memoryManager.release(frame);
+ framePool.release(frame);
}
}
frame = inbox.poll();
@@ -366,13 +478,22 @@
writer.flush();
// At this point. We consumed all memory and spilled
// We can't assume the next will be in memory. what if there is 0 memory?
- synchronized (FeedRuntimeInputHandler.this) {
+ synchronized (mutex) {
frame = inbox.poll();
if (frame == null) {
// Nothing in memory
if (spiller.switchToMemory()) {
+ if (poisoned) {
+ break;
+ }
+ if (DEBUG) {
+ LOGGER.info("Consumer is going to sleep");
+ }
// Nothing in disk
- FeedRuntimeInputHandler.this.wait();
+ mutex.wait();
+ if (DEBUG) {
+ LOGGER.info("Consumer is waking up");
+ }
}
}
}
@@ -383,5 +504,18 @@
}
// cleanup will always be done through the close() call
}
+
+ @Override
+ public String toString() {
+ return "consumed: " + consumed;
+ }
+ }
+
+ public int getTotal() {
+ return total;
+ }
+
+ public LinkedBlockingDeque<ByteBuffer> getInternalBuffer() {
+ return inbox;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
index 4a2120a..f02b4aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameAction.java
@@ -19,29 +19,45 @@
package org.apache.asterix.external.feed.dataflow;
import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingDeque;
-import rx.functions.Action1;
+import org.apache.log4j.Logger;
-public class FrameAction implements Action1<ByteBuffer> {
- private final LinkedBlockingDeque<ByteBuffer> inbox;
+public class FrameAction {
+ private static final boolean DEBUG = false;
+ private static final Logger LOGGER = Logger.getLogger(FrameAction.class.getName());
+ private ByteBuffer allocated;
private ByteBuffer frame;
- public FrameAction(LinkedBlockingDeque<ByteBuffer> inbox) {
- this.inbox = inbox;
- }
-
- @Override
public void call(ByteBuffer freeFrame) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: My subscription is being answered");
+ }
freeFrame.put(frame);
- inbox.add(freeFrame);
synchronized (this) {
- notify();
+ allocated = freeFrame;
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Waking up waiting threads");
+ }
+ notifyAll();
}
}
- public ByteBuffer getFrame() {
- return frame;
+ public synchronized ByteBuffer retrieve() throws InterruptedException {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Attempting to get allocated buffer");
+ }
+ while (allocated == null) {
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Allocated buffer is not ready yet. I will wait for it");
+ }
+ wait();
+ if (DEBUG) {
+ LOGGER.info("FrameAction: Awoken Up");
+ }
+ }
+ ByteBuffer temp = allocated;
+ allocated = null;
+ return temp;
}
public void setFrame(ByteBuffer frame) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index f0d226a..a2f19bb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -44,6 +44,7 @@
public class FrameSpiller {
private static final Logger LOGGER = Logger.getLogger(FrameSpiller.class.getName());
private static final int FRAMES_PER_FILE = 1024;
+ public static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8;
private final String fileNamePrefix;
private final ArrayDeque<File> files = new ArrayDeque<>();
@@ -88,11 +89,11 @@
}
public synchronized ByteBuffer next() throws HyracksDataException {
+ frame.reset();
+ if (totalReadCount == totalWriteCount) {
+ return null;
+ }
try {
- frame.reset();
- if (totalReadCount == totalWriteCount) {
- return null;
- }
if (currentReadFile == null) {
if (!files.isEmpty()) {
currentReadFile = files.pop();
@@ -126,6 +127,10 @@
return frame.getBuffer();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ synchronized (this) {
+ notify();
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
index 25aa86a..e5543d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ConcurrentFramePool.java
@@ -28,10 +28,15 @@
import org.apache.asterix.external.feed.dataflow.FrameAction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
public class ConcurrentFramePool {
+ private static final boolean DEBUG = false;
private static final String ERROR_INVALID_FRAME_SIZE =
"The size should be an integral multiple of the default frame size";
+ private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
+ "The requested frame size must not be greater than the allocated budget";
+ private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
private final String nodeId;
private final int budget;
private final int defaultFrameSize;
@@ -49,11 +54,31 @@
this.largeFramesPools = new HashMap<>();
}
+ public int getMaxFrameSize() {
+ return budget * defaultFrameSize;
+ }
+
public synchronized ByteBuffer get() {
+ // Subscribers have higher priority
+ if (subscribers.isEmpty()) {
+ return doGet();
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
+ return null;
+ }
+
+ private ByteBuffer doGet() {
if (handedOut < budget) {
handedOut++;
return allocate();
}
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = 1");
+ }
return null;
}
@@ -61,11 +86,15 @@
return budget - handedOut;
}
- public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ private ByteBuffer doGet(int bufferSize) throws HyracksDataException {
+ // Subscribers have higher priority
if (bufferSize % defaultFrameSize != 0) {
throw new HyracksDataException(ERROR_INVALID_FRAME_SIZE);
}
int multiplier = bufferSize / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
if (handedOut + multiplier <= budget) {
handedOut += multiplier;
ArrayDeque<ByteBuffer> largeFramesPool = largeFramesPools.get(multiplier);
@@ -76,9 +105,26 @@
created += multiplier;
return ByteBuffer.allocate(bufferSize);
}
- return largeFramesPool.poll();
+ ByteBuffer buffer = largeFramesPool.poll();
+ buffer.clear();
+ return buffer;
}
// Not enough budget
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer without exceeding budget. Remaining = " + remaining()
+ + ", Requested = " + multiplier);
+ }
+ return null;
+ }
+
+ public synchronized ByteBuffer get(int bufferSize) throws HyracksDataException {
+ if (subscribers.isEmpty()) {
+ return doGet(bufferSize);
+ }
+ if (DEBUG) {
+ LOGGER.info("Unable to allocate buffer since subscribers are in-line. Number of subscribers = "
+ + subscribers.size());
+ }
return null;
}
@@ -121,7 +167,9 @@
created++;
return ByteBuffer.allocate(defaultFrameSize);
} else {
- return pool.pop();
+ ByteBuffer buffer = pool.pop();
+ buffer.clear();
+ return buffer;
}
}
@@ -150,6 +198,9 @@
public synchronized void release(ByteBuffer buffer) throws HyracksDataException {
int multiples = buffer.capacity() / defaultFrameSize;
handedOut -= multiples;
+ if (DEBUG) {
+ LOGGER.info("Releasing " + multiples + " frames. Remaining frames = " + remaining());
+ }
if (multiples == 1) {
pool.add(buffer);
} else {
@@ -163,22 +214,48 @@
// check subscribers
while (!subscribers.isEmpty()) {
FrameAction frameAction = subscribers.peek();
+ ByteBuffer freeBuffer;
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested 1 frame");
+ }
+ freeBuffer = doGet();
} else {
- buffer = get(frameAction.getSize());
+ if (DEBUG) {
+ LOGGER.info("Attempting to callback a subscriber that requested "
+ + frameAction.getSize() / defaultFrameSize + " frames");
+ }
+ freeBuffer = doGet(frameAction.getSize());
}
- if (buffer != null) {
+ if (freeBuffer != null) {
+ int handedOutBeforeCall = handedOut;
try {
- frameAction.call(buffer);
+ frameAction.call(freeBuffer);
+ } catch (Exception e) {
+ LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+ // TODO(amoudi): Add test cases and get rid of recursion
+ if (handedOut == handedOutBeforeCall) {
+ release(freeBuffer);
+ }
+ throw e;
} finally {
subscribers.remove();
+ if (DEBUG) {
+ LOGGER.info(
+ "A subscription has been satisfied. " + subscribers.size() + " remaining subscribers");
+ }
}
} else {
+ if (DEBUG) {
+ LOGGER.info("Failed to allocate requested frames");
+ }
break;
}
}
+ if (DEBUG) {
+ LOGGER.info(subscribers.size() + " remaining subscribers");
+ }
}
public synchronized boolean subscribe(FrameAction frameAction) throws HyracksDataException {
@@ -187,18 +264,30 @@
ByteBuffer buffer;
// check if we have enough and answer immediately.
if (frameAction.getSize() == defaultFrameSize) {
- buffer = get();
+ buffer = doGet();
} else {
- buffer = get(frameAction.getSize());
+ buffer = doGet(frameAction.getSize());
}
if (buffer != null) {
frameAction.call(buffer);
// There is no need to subscribe. perform action and return false
return false;
}
+ } else {
+ int multiplier = frameAction.getSize() / defaultFrameSize;
+ if (multiplier > budget) {
+ throw new HyracksDataException(ERROR_LARGER_THAN_BUDGET_REQUEST);
+ }
}
// none of the above, add to subscribers and return true
subscribers.add(frameAction);
return true;
}
+
+ /*
+ * For unit testing purposes
+ */
+ public Collection<FrameAction> getSubscribers() {
+ return subscribers;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
index f888542..18d4cff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/FeedRuntimeId.java
@@ -35,11 +35,11 @@
private final String targetId;
private final int hashCode;
- public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String operandId) {
+ public FeedRuntimeId(FeedId feedId, FeedRuntimeType runtimeType, int partition, String targetId) {
this.feedId = feedId;
this.runtimeType = runtimeType;
this.partition = partition;
- this.targetId = operandId;
+ this.targetId = targetId;
this.hashCode = toString().hashCode();
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
similarity index 74%
rename from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
rename to asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
index 8a6d1b6..444d8a5 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/FeedMemoryManagerUnitTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
@@ -22,8 +22,10 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Random;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
import org.apache.asterix.external.feed.management.ConcurrentFramePool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.junit.Assert;
@@ -36,7 +38,7 @@
import junit.framework.TestSuite;
@RunWith(PowerMockRunner.class)
-public class FeedMemoryManagerUnitTest extends TestCase {
+public class ConcurrentFramePoolUnitTest extends TestCase {
private static final int DEFAULT_FRAME_SIZE = 32768;
private static final int NUM_FRAMES = 2048;
@@ -44,8 +46,9 @@
private static final int NUM_THREADS = 8;
private static final int MAX_SIZE = 52;
private static final double RELEASE_PROBABILITY = 0.20;
+ private volatile static HyracksDataException cause = null;
- public FeedMemoryManagerUnitTest(String testName) {
+ public ConcurrentFramePoolUnitTest(String testName) {
super(testName);
}
@@ -53,7 +56,7 @@
* @return the suite of tests being tested
*/
public static Test suite() {
- return new TestSuite(FeedMemoryManagerUnitTest.class);
+ return new TestSuite(ConcurrentFramePoolUnitTest.class);
}
@org.junit.Test
@@ -67,6 +70,7 @@
i++;
}
Assert.assertEquals(i, NUM_FRAMES);
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -97,6 +101,7 @@
th.printStackTrace();
Assert.fail(th.getMessage());
}
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -131,6 +136,7 @@
th.printStackTrace();
Assert.fail(th.getMessage());
}
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -170,6 +176,7 @@
th.printStackTrace();
Assert.fail(th.getMessage());
}
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -201,6 +208,7 @@
}
stack.clear();
Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -231,6 +239,7 @@
th.printStackTrace();
Assert.fail(th.getMessage());
}
+ Assert.assertNull(cause);
}
@org.junit.Test
@@ -281,6 +290,8 @@
} catch (Throwable th) {
th.printStackTrace();
Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
}
}
@@ -315,6 +326,125 @@
} catch (Throwable th) {
th.printStackTrace();
Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testFixedSizeSubscribtion() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.retrieve());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+ Assert.assertEquals(0, fmm.remaining());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetRequests() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ fmm.get(32 * DEFAULT_FRAME_SIZE);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNotNull(hde);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testLargerThanBudgetSubscribe() {
+ HyracksDataException hde = null;
+ try {
+ ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", DEFAULT_FRAME_SIZE * 16, DEFAULT_FRAME_SIZE);
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 32);
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ fmm.subscribe(frameAction);
+ } catch (HyracksDataException e) {
+ hde = e;
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ }
+ Assert.assertNotNull(hde);
+ Assert.assertNull(cause);
+ }
+
+ @org.junit.Test
+ public void testgetWhileSubscribersExist() {
+ try {
+ AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
+ Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
+ ConcurrentFramePool fmm =
+ new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+ int i = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
+ FrameAction frameAction = new FrameAction();
+ frameAction.setFrame(buffer);
+ while (!fmm.subscribe(frameAction)) {
+ buffers.put(frameAction.retrieve());
+ i++;
+ }
+ // One subscriber.
+ // Check that all frames have been consumed
+ Assert.assertEquals(i, NUM_FRAMES);
+ // Release a frame (That will be handed out to the subscriber)
+ fmm.release(buffers.take());
+ // Check that all frames have been consumed (since the released frame have been handed to the consumer)
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffers.put(frameAction.retrieve());
+ // Create another subscriber that takes frames of double the size
+ ByteBuffer bufferTimes2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+ LinkedBlockingDeque<ByteBuffer> buffersTimes2 = new LinkedBlockingDeque<>();
+ FrameAction frameActionTimes2 = new FrameAction();
+ frameActionTimes2.setFrame(bufferTimes2);
+ Assert.assertEquals(true, fmm.subscribe(frameActionTimes2));
+ // release a small one
+ fmm.release(buffers.take());
+ Assert.assertEquals(fmm.remaining(), 1);
+ // Check that a small get fails
+ Assert.assertEquals(null, fmm.get());
+ // release another small one
+ fmm.release(buffers.take());
+ // Check that no small frames exists in the pool since subscriber request was satisfied
+ Assert.assertEquals(fmm.remaining(), 0);
+ buffersTimes2.add(frameActionTimes2.retrieve());
+ fmm.release(buffers);
+ fmm.release(bufferTimes2);
+ Assert.assertEquals(fmm.remaining(), NUM_FRAMES);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail(th.getMessage());
+ } finally {
+ Assert.assertNull(cause);
}
}
@@ -362,7 +492,8 @@
try {
fmm.release(stack.pop());
} catch (HyracksDataException e) {
- Assert.fail();
+ e.printStackTrace();
+ cause = e;
}
}
} else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
new file mode 100644
index 0000000..705d5e3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -0,0 +1,794 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.asterix.external.feed.management.FeedConnectionId;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestControlledFrameWriter;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+@RunWith(PowerMockRunner.class)
+public class InputHandlerTest extends TestCase {
+
+ private static final int DEFAULT_FRAME_SIZE = 32768;
+ private static final int NUM_FRAMES = 128;
+ private static final long FEED_MEM_BUDGET = DEFAULT_FRAME_SIZE * NUM_FRAMES;
+ private static final String DATAVERSE = "dataverse";
+ private static final String DATASET = "dataset";
+ private static final String FEED = "feed";
+ private static final String NODE_ID = "NodeId";
+ private static final float DISCARD_ALLOWANCE = 0.15f;
+ private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1);
+ private volatile static HyracksDataException cause = null;
+
+ public InputHandlerTest(String testName) {
+ super(testName);
+ }
+
+ public static Test suite() {
+ return new TestSuite(InputHandlerTest.class);
+ }
+
+ private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer,
+ FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException {
+ FrameTupleAccessor fta = Mockito.mock(FrameTupleAccessor.class);
+ FeedId feedId = new FeedId(DATAVERSE, FEED);
+ FeedConnectionId connectionId = new FeedConnectionId(feedId, DATASET);
+ FeedRuntimeId runtimeId =
+ new FeedRuntimeId(feedId, FeedRuntimeType.COLLECT, 0, FeedRuntimeId.DEFAULT_TARGET_ID);
+ return new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, writer, fpa, fta, framePool);
+ }
+
+ /*
+ * Testing the following scenarios
+ * 01. Positive Frames memory budget with fixed size frames, no spill, no discard.
+ * 02. Positive Frames memory budget with variable size frames, no spill, no discard.
+ * 03. Positive Frames memory budget with fixed size frames, with spill, no discard.
+ * 04. Positive Frames memory budget with variable size frames, with spill, no discard.
+ * 05. Positive Frames memory budget with fixed size frames, no spill, with discard.
+ * 06. Positive Frames memory budget with variable size frames, no spill, with discard.
+ * 07. Positive Frames memory budget with fixed size frames, with spill, with discard.
+ * 08. Positive Frames memory budget with variable size frames, with spill, with discard.
+ * 09. 0 Frames memory budget with fixed size frames, with spill, no discard.
+ * 10. 0 Frames memory budget with variable size frames, with spill, no discard.
+ * 11. TODO 0 Frames memory budget with fixed size frames, with spill, with discard.
+ * 12. TODO 0 Frames memory budget with variable size frames, with spill, with discard.
+ * 13. TODO Test exception handling with Open, NextFrame,Flush,Close,Fail exception throwing FrameWriter
+ * 14. TODO Test exception while waiting for subscription
+ */
+
+ private static FeedPolicyAccessor createFeedPolicyAccessor(boolean spill, boolean discard, long spillBudget,
+ float discardFraction) {
+ FeedPolicyAccessor fpa = Mockito.mock(FeedPolicyAccessor.class);
+ Mockito.when(fpa.bufferingEnabled()).thenReturn(true);
+ Mockito.when(fpa.spillToDiskOnCongestion()).thenReturn(spill);
+ Mockito.when(fpa.getMaxSpillOnDisk()).thenReturn(spillBudget);
+ Mockito.when(fpa.discardOnCongestion()).thenReturn(discard);
+ Mockito.when(fpa.getMaxFractionDiscard()).thenReturn(discardFraction);
+ return fpa;
+ }
+
+ @org.junit.Test
+ public void testZeroMemoryVarSizeFrameWithDiskNoDiscard() {
+ try {
+ int numRounds = 5;
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ handler.nextFrame(buffer);
+ Assert.assertEquals(0, handler.getNumProcessedInMemory());
+ Assert.assertEquals(1, handler.getNumSpilled());
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ int multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ handler.nextFrame(buffer);
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ } finally {
+ Assert.assertNull(cause);
+ }
+ }
+
+ @org.junit.Test
+ public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ handler.nextFrame(frame.getBuffer());
+ Assert.assertEquals(0, handler.getNumProcessedInMemory());
+ Assert.assertEquals(1, handler.getNumSpilled());
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1, handler.getNumSpilled());
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds + 1, writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ } finally {
+ Assert.assertNull(cause);
+ }
+
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryVarSizeFrameWithSpillWithDiscard() {
+ try {
+ int numberOfMemoryFrames = 50;
+ int numberOfSpillFrames = 50;
+ int notDiscarded = 0;
+ int totalMinFrames = 0;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ ByteBuffer buffer2 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 2);
+ ByteBuffer buffer3 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 3);
+ ByteBuffer buffer4 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 4);
+ ByteBuffer buffer5 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * 5);
+ while (true) {
+ if (totalMinFrames + 1 < numberOfMemoryFrames) {
+ handler.nextFrame(buffer1);
+ notDiscarded++;
+ totalMinFrames++;
+ } else {
+ break;
+ }
+ if (totalMinFrames + 2 < numberOfMemoryFrames) {
+ notDiscarded++;
+ totalMinFrames += 2;
+ handler.nextFrame(buffer2);
+ } else {
+ break;
+ }
+ if (totalMinFrames + 3 < numberOfMemoryFrames) {
+ notDiscarded++;
+ totalMinFrames += 3;
+ handler.nextFrame(buffer3);
+ } else {
+ break;
+ }
+ }
+ // Now we need to verify that the frame pool memory has been consumed!
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(0, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ while (true) {
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer3);
+ } else {
+ break;
+ }
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer4);
+ } else {
+ break;
+ }
+ if (handler.getNumSpilled() < numberOfSpillFrames) {
+ notDiscarded++;
+ handler.nextFrame(buffer5);
+ } else {
+ break;
+ }
+ }
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+ Assert.assertEquals(handler.framesOnDisk(), numberOfSpillFrames);
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ // We can only discard one frame
+ double numDiscarded = 0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(buffer5);
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Assert.assertTrue(framePool.remaining() < 3);
+ Assert.assertEquals(handler.framesOnDisk(), handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Next Call should block since we're exceeding the discard allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer5, handler));
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), notDiscarded + 1);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = true;
+ * Discard = true
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameWithSpillWithDiscard() {
+ try {
+ int numberOfMemoryFrames = 50;
+ int numberOfSpillFrames = 50;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ for (int i = 0; i < numberOfMemoryFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Now we need to verify that the frame pool memory has been consumed!
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals(numberOfMemoryFrames, handler.getTotal());
+ Assert.assertEquals(0, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ for (int i = 0; i < numberOfSpillFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals(numberOfMemoryFrames + numberOfSpillFrames, handler.getTotal());
+ Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals(0, handler.getNumDiscarded());
+ // We can only discard one frame
+ double numDiscarded = 0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(frame.getBuffer());
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Assert.assertEquals(0, framePool.remaining());
+ Assert.assertEquals((int) (numberOfMemoryFrames + numberOfSpillFrames + numDiscarded), handler.getTotal());
+ Assert.assertEquals(numberOfSpillFrames, handler.getNumSpilled());
+ Assert.assertEquals(0, handler.getNumStalled());
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Next Call should block since we're exceeding the discard allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+ } else {
+ Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertTrue(result.isDone());
+ Assert.assertEquals(writer.nextFrameCount(), numberOfMemoryFrames + numberOfSpillFrames + 1);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryVariableSizeFrameNoSpillWithDiscard() {
+ try {
+ int discardTestFrames = 100;
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ // add NUM_FRAMES times
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ int numFrames = 0;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ numFrames++;
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // Next call should NOT block but should discard.
+ double numDiscarded = 0.0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(buffer);
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+ } else {
+ // Check that no records were discarded
+ assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), numFrames + 1);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false;
+ * Discard = true; discard only 5%
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoSpillWithDiscard() {
+ try {
+ int discardTestFrames = 100;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool =
+ new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < discardTestFrames; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next 5 calls call should NOT block but should discard.
+ double numDiscarded = 0.0;
+ boolean nextShouldDiscard =
+ ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ while (nextShouldDiscard) {
+ handler.nextFrame(frame.getBuffer());
+ numDiscarded++;
+ nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+ }
+ // Next Call should block since we're exceeding the discard allowance
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+ if (result.isDone()) {
+ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance");
+ } else {
+ // Check that no records were discarded
+ assertEquals((int) numDiscarded, handler.getNumDiscarded());
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ }
+ // consume memory frames
+ writer.unfreeze();
+ result.get();
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), discardTestFrames + 1);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = true;
+ * Discard = false;
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameWithSpillNoDiscard() {
+ try {
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next call should NOT block. we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+ result.get();
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 1);
+ // consume memory frames
+ writer.unfreeze();
+ handler.close();
+ Assert.assertEquals(handler.framesOnDisk(), 0);
+ // exit
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ * Very fast next operator
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(handler.getNumSpilled(), 0);
+ writer.validate(false);
+ handler.close();
+ // Check that nextFrame was called
+ Assert.assertEquals(NUM_FRAMES * numRounds, writer.nextFrameCount());
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ * Slow next operator
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() {
+ try {
+ int numRounds = 10;
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ writer.setNextDuration(1);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES * numRounds; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(handler.getNumSpilled(), 0);
+ // Check that nextFrame was called
+ writer.validate(false);
+ handler.close();
+ Assert.assertEquals(writer.nextFrameCount(), (NUM_FRAMES * numRounds));
+ writer.validate(true);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false
+ * Discard = false
+ * VarSizeFrame
+ */
+ public void testMemoryVarSizeFrameNoDiskNoDiscard() {
+ try {
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // we can't satisfy the next request
+ // Next call should block we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ // Check that the nextFrame didn't return
+ if (result.isDone()) {
+ Assert.fail();
+ }
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ assertEquals(handler.getNumSpilled(), 0);
+ // Check that number of stalled is not greater than 1
+ Assert.assertTrue(handler.getNumStalled() <= 1);
+ writer.unfreeze();
+ result.get();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = true;
+ * Discard = false;
+ * Variable size frames
+ */
+ @org.junit.Test
+ public void testMemoryVarSizeFrameWithSpillNoDiscard() {
+ try {
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // Next call should Not block. we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ result.get();
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 1);
+ // consume memory frames
+ while (!handler.getInternalBuffer().isEmpty()) {
+ writer.kick();
+ }
+ // There should be 1 frame on disk
+ Assert.assertEquals(1, handler.framesOnDisk());
+ writer.unfreeze();
+ result.get();
+ Assert.assertEquals(0, handler.framesOnDisk());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ /*
+ * Spill = false;
+ * Discard = false;
+ * Fixed size frames
+ */
+ @org.junit.Test
+ public void testMemoryFixedSizeFrameNoDiskNoDiscard() {
+ try {
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // No spill, No discard
+ FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ // add NUM_FRAMES times
+ for (int i = 0; i < NUM_FRAMES; i++) {
+ handler.nextFrame(frame.getBuffer());
+ }
+ // Next call should block we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler));
+ // Check that the nextFrame didn't return
+ if (result.isDone()) {
+ Assert.fail();
+ } else {
+ // Check that no records were discarded
+ Assert.assertEquals(handler.getNumDiscarded(), 0);
+ // Check that no records were spilled
+ Assert.assertEquals(handler.getNumSpilled(), 0);
+ // Check that no records were discarded
+ // Check that the inputHandler subscribed to the framePool
+ // Check that number of stalled is not greater than 1
+ Assert.assertTrue(handler.getNumStalled() <= 1);
+ writer.kick();
+ }
+ result.get();
+ writer.unfreeze();
+ handler.close();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
+ }
+ Assert.assertNull(cause);
+ }
+
+ private class Pusher implements Runnable {
+ private final ByteBuffer buffer;
+ private final IFrameWriter writer;
+
+ public Pusher(ByteBuffer buffer, IFrameWriter writer) {
+ this.buffer = buffer;
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ writer.nextFrame(buffer);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ cause = e;
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 3961921..9336921 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -39,7 +39,23 @@
<properties>
<root.dir>${basedir}/../..</root.dir>
</properties>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
<dependencies>
<dependency>
<groupId>org.json</groupId>
@@ -77,5 +93,22 @@
<artifactId>hyracks-util</artifactId>
<version>0.2.18-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
new file mode 100644
index 0000000..19998a7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowError.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowError extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowError(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object call() throws HyracksDataException {
+ count++;
+ throw new UnknownError(errorMessage);
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new UnknownError(errorMessage);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
new file mode 100644
index 0000000..5a5ad59
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAndThrowException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+
+public class CountAndThrowException extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowException(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object call() throws HyracksDataException {
+ count++;
+ throw new HyracksDataException(errorMessage);
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new HyracksDataException(errorMessage);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
new file mode 100644
index 0000000..e8a6654
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/CountAnswer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class CountAnswer implements Answer<Object> {
+ protected int count = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ return null;
+ }
+
+ public Object call() throws HyracksDataException {
+ count++;
+ return null;
+ }
+
+ public int getCallCount() {
+ return count;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
new file mode 100644
index 0000000..4bddfa9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.util.Collection;
+
+public class FrameWriterTestUtils {
+ public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method ";
+ public static final String ERROR_MESSAGE = "IFrameWriter Error in the call to the method ";
+
+ public enum FrameWriterOperation {
+ Open,
+ NextFrame,
+ Fail,
+ Flush,
+ Close
+ }
+
+ public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations) {
+ CountAnswer openAnswer =
+ createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer =
+ createAnswer(FrameWriterOperation.NextFrame, exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer =
+ createAnswer(FrameWriterOperation.Flush, exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer =
+ createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer =
+ createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations);
+ return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer);
+ }
+
+ public static CountAnswer createAnswer(FrameWriterOperation operation,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations) {
+ if (exceptionThrowingOperations.contains(operation)) {
+ return new CountAndThrowException(EXCEPTION_MESSAGE + operation.toString());
+ } else if (exceptionThrowingOperations.contains(operation)) {
+ return new CountAndThrowError(ERROR_MESSAGE + operation.toString());
+ } else {
+ return new CountAnswer();
+ }
+ }
+
+ public static TestControlledFrameWriter create(int initialFrameSize) {
+ return new TestControlledFrameWriter(initialFrameSize);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
new file mode 100644
index 0000000..2a3f70d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestControlledFrameWriter extends TestFrameWriter {
+ private boolean frozen = false;
+ private boolean timed = false;
+ private long duration = Long.MAX_VALUE;
+ private final int initialFrameSize;
+ private volatile int currentMultiplier = 0;
+ private volatile int kicks = 0;
+
+ public TestControlledFrameWriter(int initialFrameSize) {
+ super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer());
+ this.initialFrameSize = initialFrameSize;
+ }
+
+ public int getCurrentMultiplier() {
+ return currentMultiplier;
+ }
+
+ public synchronized void freeze() {
+ frozen = true;
+ }
+
+ public synchronized void time(long ms) {
+ frozen = true;
+ timed = true;
+ duration = ms;
+ }
+
+ public synchronized void unfreeze() {
+ frozen = false;
+ notify();
+ }
+
+ public synchronized void kick() {
+ kicks++;
+ notify();
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ super.nextFrame(buffer);
+ currentMultiplier = buffer.capacity() / initialFrameSize;
+ if (frozen) {
+ try {
+ if (timed) {
+ wait(duration);
+ } else {
+ while (frozen && kicks == 0) {
+ wait();
+ }
+ kicks--;
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ currentMultiplier = 0;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
new file mode 100644
index 0000000..b3492fe
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.test;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestFrameWriter implements IFrameWriter {
+ private final CountAnswer openAnswer;
+ private final CountAnswer nextAnswer;
+ private final CountAnswer flushAnswer;
+ private final CountAnswer failAnswer;
+ private final CountAnswer closeAnswer;
+ private long openDuration = 0L;
+ private long nextDuration = 0L;
+ private long flushDuration = 0L;
+ private long failDuration = 0L;
+ private long closeDuration = 0L;
+
+ public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer,
+ CountAnswer failAnswer, CountAnswer closeAnswer) {
+ this.openAnswer = openAnswer;
+ this.nextAnswer = nextAnswer;
+ this.closeAnswer = closeAnswer;
+ this.flushAnswer = flushAnswer;
+ this.failAnswer = failAnswer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ delay(openDuration);
+ openAnswer.call();
+ }
+
+ public int openCount() {
+ return openAnswer.getCallCount();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ delay(nextDuration);
+ nextAnswer.call();
+ }
+
+ public int nextFrameCount() {
+ return nextAnswer.getCallCount();
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ delay(flushDuration);
+ flushAnswer.call();
+ }
+
+ public int flushCount() {
+ return flushAnswer.getCallCount();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ delay(failDuration);
+ failAnswer.call();
+ }
+
+ public int failCount() {
+ return failAnswer.getCallCount();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ delay(closeDuration);
+ closeAnswer.call();
+ }
+
+ public int closeCount() {
+ return closeAnswer.getCallCount();
+ }
+
+ public synchronized boolean validate(boolean finished) {
+ if (failAnswer.getCallCount() > 1 || closeAnswer.getCallCount() > 1 || openAnswer.getCallCount() > 1) {
+ return false;
+ }
+ if (openAnswer.getCallCount() == 0
+ && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0 || closeAnswer.getCallCount() > 0)) {
+ return false;
+ }
+ if (finished) {
+ if (closeAnswer.getCallCount() == 0 && (nextAnswer.getCallCount() > 0 || failAnswer.getCallCount() > 0
+ || openAnswer.getCallCount() > 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void reset() {
+ openAnswer.reset();
+ nextAnswer.reset();
+ flushAnswer.reset();
+ failAnswer.reset();
+ closeAnswer.reset();
+ }
+
+ public long getOpenDuration() {
+ return openDuration;
+ }
+
+ public void setOpenDuration(long openDuration) {
+ this.openDuration = openDuration;
+ }
+
+ public long getNextDuration() {
+ return nextDuration;
+ }
+
+ public void setNextDuration(long nextDuration) {
+ this.nextDuration = nextDuration;
+ }
+
+ public long getFlushDuration() {
+ return flushDuration;
+ }
+
+ public void setFlushDuration(long flushDuration) {
+ this.flushDuration = flushDuration;
+ }
+
+ public long getFailDuration() {
+ return failDuration;
+ }
+
+ public void setFailDuration(long failDuration) {
+ this.failDuration = failDuration;
+ }
+
+ public long getCloseDuration() {
+ return closeDuration;
+ }
+
+ public void setCloseDuration(long closeDuration) {
+ this.closeDuration = closeDuration;
+ }
+
+ private void delay(long duration) throws HyracksDataException {
+ if (duration > 0) {
+ try {
+ synchronized (this) {
+ wait(duration);
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 581fde4..d07d633 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -39,6 +39,13 @@
<dependencies>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-common</artifactId>
<version>0.2.18-SNAPSHOT</version>
<type>jar</type>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index df3a211..d3e7a3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -30,6 +30,9 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAndThrowError;
+import org.apache.hyracks.api.test.CountAndThrowException;
+import org.apache.hyracks.api.test.CountAnswer;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -92,8 +95,8 @@
public boolean validate(boolean finished) {
// get number of open calls
int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
- int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
- + nextFrameError.getCallCount();
+ int nextFrameCount =
+ nextFrameException.getCallCount() + nextFrameNormal.getCallCount() + nextFrameError.getCallCount();
int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
@@ -422,8 +425,9 @@
public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
- AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
- * searchOpCallbackFactories.length];
+ AbstractTreeIndexOperatorDescriptor[] opDescs =
+ new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+ * searchOpCallbackFactories.length];
int k = 0;
for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
for (int j = 0; j < searchOpCallbackFactories.length; j++) {
@@ -452,52 +456,6 @@
return opCallback;
}
- public class CountAnswer implements Answer<Object> {
- protected int count = 0;
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- return null;
- }
-
- public int getCallCount() {
- return count;
- }
-
- public void reset() {
- count = 0;
- }
- }
-
- public class CountAndThrowException extends CountAnswer {
- private String errorMessage;
-
- public CountAndThrowException(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- throw new HyracksDataException(errorMessage);
- }
- }
-
- public class CountAndThrowError extends CountAnswer {
- private String errorMessage;
-
- public CountAndThrowError(String errorMessage) {
- this.errorMessage = errorMessage;
- }
-
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- count++;
- throw new UnknownError(errorMessage);
- }
- }
-
public IFrameWriter[] createOutputWriters() throws Exception {
CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };