Merged fullstack_asterix_stabilization -r 2933:3157
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@3164 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-comm/pom.xml b/hyracks/hyracks-comm/pom.xml
new file mode 100644
index 0000000..c3583699
--- /dev/null
+++ b/hyracks/hyracks-comm/pom.xml
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-comm</artifactId>
+ <name>hyracks-comm</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
new file mode 100644
index 0000000..fac2949
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class DatasetNetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IChannelConnectionFactory netManager;
+
+ private final SocketAddress remoteAddress;
+
+ private final JobId jobId;
+
+ private final int partition;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final int nBuffers;
+
+ private ChannelControlBlock ccb;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
+ int partition, int nBuffers) {
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.jobId = jobId;
+ this.partition = partition;
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.nBuffers = nBuffers;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(jobId.getId());
+ writeBuffer.putInt(partition);
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: "
+ + ccb);
+ }
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(DatasetNetworkInputChannel.this);
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
new file mode 100644
index 0000000..33179ba
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/IChannelConnectionFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public interface IChannelConnectionFactory {
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
+}
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
new file mode 100644
index 0000000..aa37b16
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IChannelConnectionFactory netManager;
+
+ private final SocketAddress remoteAddress;
+
+ private final PartitionId partitionId;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final int nBuffers;
+
+ private ChannelControlBlock ccb;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public NetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress,
+ PartitionId partitionId, int nBuffers) {
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.partitionId = partitionId;
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.nBuffers = nBuffers;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+ }
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
new file mode 100644
index 0000000..812a2de
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
+
+ private final int nBuffers;
+
+ private final Deque<ByteBuffer> emptyStack;
+
+ private boolean aborted;
+
+ public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
+ this.nBuffers = nBuffers;
+ emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ }
+
+ public void setFrameSize(int frameSize) {
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyStack.push(ByteBuffer.allocateDirect(frameSize));
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer destBuffer = null;
+ synchronized (this) {
+ while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
+ destBuffer = emptyStack.poll();
+ if (destBuffer != null) {
+ break;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ buffer.position(0);
+ buffer.limit(destBuffer.capacity());
+ destBuffer.clear();
+ destBuffer.put(buffer);
+ destBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ public void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyStack.push(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file