Implement a network input channel for dataset clients to read the results over the network.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2510 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannel.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannel.java
new file mode 100644
index 0000000..635188e
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannel.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IDatasetInputChannel {
+    public void registerMonitor(IDatasetInputChannelMonitor monitor);
+
+    public void setAttachment(Object attachment);
+
+    public Object getAttachment();
+
+    public ByteBuffer getNextBuffer();
+
+    public void recycleBuffer(ByteBuffer buffer);
+
+    public void open(int frameSize) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
new file mode 100644
index 0000000..d184331
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+public interface IDatasetInputChannelMonitor {
+    public void notifyFailure(IDatasetInputChannel channel);
+
+    public void notifyDataAvailability(IDatasetInputChannel channel, int nFrames);
+
+    public void notifyEndOfStream(IDatasetInputChannel channel);
+
+    public boolean eosReached();
+
+    public boolean failed();
+
+    public int getNFramesAvailable();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
new file mode 100644
index 0000000..c707709
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class DatasetNetworkInputChannel implements IDatasetInputChannel {
+    private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
+
+    static final int INITIAL_MESSAGE_SIZE = 20;
+
+    private final IChannelConnectionFactory netManager;
+
+    private final SocketAddress remoteAddress;
+
+    private final JobId jobId;
+
+    private final int partition;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final int nBuffers;
+
+    private ChannelControlBlock ccb;
+
+    private IDatasetInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
+            int partition, int nBuffers) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.jobId = jobId;
+        this.partition = partition;
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.nBuffers = nBuffers;
+    }
+
+    @Override
+    public void registerMonitor(IDatasetInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(int frameSize) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(frameSize));
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(jobId.getId());
+        writeBuffer.putInt(partition);
+        writeBuffer.flip();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Sending partition request for JobId: " + jobId + " partition: " + partition + " on channel: "
+                    + ccb);
+        }
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(DatasetNetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(DatasetNetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(DatasetNetworkInputChannel.this);
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}
\ No newline at end of file