Changed empty queues to stacks. Replaced LinkedList with ArrayDeque.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1072 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 8d5f475..185768e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Deque;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -27,15 +27,15 @@
public class NetworkOutputChannel implements IFrameWriter {
private final ChannelControlBlock ccb;
- private final Queue<ByteBuffer> emptyQueue;
+ private final Deque<ByteBuffer> emptyStack;
private boolean aborted;
public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
this.ccb = ccb;
- emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
- emptyQueue.add(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+ emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
}
ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@@ -52,7 +52,7 @@
if (aborted) {
throw new HyracksDataException("Connection has been aborted");
}
- destBuffer = emptyQueue.poll();
+ destBuffer = emptyStack.poll();
if (destBuffer != null) {
break;
}
@@ -93,7 +93,7 @@
@Override
public void accept(ByteBuffer buffer) {
synchronized (NetworkOutputChannel.this) {
- emptyQueue.add(buffer);
+ emptyStack.push(buffer);
NetworkOutputChannel.this.notifyAll();
}
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index e3d057a..7e11518 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -17,7 +17,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -68,7 +69,7 @@
}
private final class ReadInterface implements IChannelReadInterface {
- private final Queue<ByteBuffer> riEmptyQueue;
+ private final Deque<ByteBuffer> riEmptyStack;
private final IBufferAcceptor eba = new IBufferAcceptor() {
@Override
@@ -78,7 +79,7 @@
}
int delta = buffer.remaining();
synchronized (ChannelControlBlock.this) {
- riEmptyQueue.add(buffer);
+ riEmptyStack.push(buffer);
}
cSet.addPendingCredits(channelId, delta);
}
@@ -91,7 +92,7 @@
private ByteBuffer currentReadBuffer;
ReadInterface() {
- riEmptyQueue = new LinkedList<ByteBuffer>();
+ riEmptyStack = new ArrayDeque<ByteBuffer>();
credits = 0;
}
@@ -111,7 +112,7 @@
return size;
}
if (ri.currentReadBuffer == null) {
- ri.currentReadBuffer = ri.riEmptyQueue.poll();
+ ri.currentReadBuffer = ri.riEmptyStack.poll();
assert ri.currentReadBuffer != null;
}
int rSize = Math.min(size, ri.currentReadBuffer.remaining());
@@ -200,7 +201,7 @@
private ByteBuffer currentWriteBuffer;
WriteInterface() {
- wiFullQueue = new LinkedList<ByteBuffer>();
+ wiFullQueue = new ArrayDeque<ByteBuffer>();
credits = new AtomicInteger();
eos = false;
eosSent = false;