Reimplement the DatasetNetworkInputChannel using the IInputChannel interface instead of creating a new interface defintion.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2540 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index c707709..fac2949 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -21,15 +21,16 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
-import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
-public class DatasetNetworkInputChannel implements IDatasetInputChannel {
+public class DatasetNetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
 
     static final int INITIAL_MESSAGE_SIZE = 20;
@@ -48,7 +49,7 @@
 
     private ChannelControlBlock ccb;
 
-    private IDatasetInputChannelMonitor monitor;
+    private IInputChannelMonitor monitor;
 
     private Object attachment;
 
@@ -63,7 +64,7 @@
     }
 
     @Override
-    public void registerMonitor(IDatasetInputChannelMonitor monitor) {
+    public void registerMonitor(IInputChannelMonitor monitor) {
         this.monitor = monitor;
     }
 
@@ -89,7 +90,7 @@
     }
 
     @Override
-    public void open(int frameSize) throws HyracksDataException {
+    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
         try {
             ccb = netManager.connect(remoteAddress);
         } catch (Exception e) {
@@ -98,7 +99,7 @@
         ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
         for (int i = 0; i < nBuffers; ++i) {
-            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(frameSize));
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
         }
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(jobId.getId());