Introduce MessagingNetworkManager for NC2NC AppMessaging

This change introduces MessagingNetworkManager to NodeControllerService.
The MessagingNetworkManager is used to open channels that are used for NC2NC
application messaging. The read and write interfaces of the messaging channels
are set by a MessagingChannelInterfaceFactory which is set by the application.

Change-Id: I5c0bd7c11c1e78954ebceff49cb274d8073a64bd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/897
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 2f34465..bd6dae9 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Logger;
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 63cb2ce..44bc3bf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -109,6 +109,7 @@
         ncConfig.clusterNetIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
         ncConfig.dataIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
         ncConfig.resultIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
+        ncConfig.messagingIPAddress = Inet4Address.getLoopbackAddress().getHostAddress();
         ncConfig.nodeId = ncName;
         ncConfig.resultTTL = 30000;
         ncConfig.resultSweepThreshold = 1000;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index c73ab0f..38d7da3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.context.AsterixFileMapManager;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -113,6 +114,7 @@
     private AsterixFeedProperties feedProperties;
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
+    private MessagingProperties messagingProperties;
 
     private AsterixThreadExecutor threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
@@ -161,6 +163,7 @@
         buildProperties = new AsterixBuildProperties(propertiesAccessor);
         replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
                 AsterixClusterProperties.INSTANCE.getCluster());
+        messagingProperties = new MessagingProperties(propertiesAccessor);
         this.metadataRmiPort = metadataRmiPort;
         libraryManager = new ExternalLibraryManager();
         if (extensions != null) {
@@ -383,6 +386,11 @@
     }
 
     @Override
+    public MessagingProperties getMessagingProperties() {
+        return messagingProperties;
+    }
+
+    @Override
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
         return datasetLifecycleManager.getOperationTracker(datasetID);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 582ff62..a8bc48f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -34,13 +34,14 @@
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.om.util.AsterixClusterProperties;
@@ -116,8 +117,14 @@
         }
         runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
-        messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
+        MessagingProperties messagingProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMessagingProperties();
+        messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService(),
+                messagingProperties);
         ncApplicationContext.setMessageBroker(messageBroker);
+        MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
+                (NCMessageBroker) messageBroker, messagingProperties);
+        ncApplicationContext.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         boolean replicationEnabled = AsterixClusterProperties.INSTANCE.isReplicationEnabled();
         boolean autoFailover = AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled();
@@ -203,7 +210,7 @@
     @Override
     public void notifyStartupComplete() throws Exception {
         //Send max resource id on this NC to the CC
-        ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
+        ((NCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
 
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
new file mode 100644
index 0000000..33f16da
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelInterfaceFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.asterix.common.memory.FrameAction;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+
+public class MessagingChannelInterfaceFactory implements IChannelInterfaceFactory {
+
+    private static final Logger LOGGER = Logger.getLogger(MessagingChannelInterfaceFactory.class.getName());
+
+    private final NCMessageBroker messageBroker;
+    private final ConcurrentFramePool messagingFramePool;
+    /* A single buffer factory used by all messaging channels */
+    private final IBufferFactory appMessagingBufferFactor = new AppMessagingBufferFactory();
+    private final int msgFrameSize;
+    private final int channelFrameCount;
+
+    public MessagingChannelInterfaceFactory(NCMessageBroker messageBroker, MessagingProperties messagingProperties) {
+        this.messageBroker = messageBroker;
+        messagingFramePool = messageBroker.getMessagingFramePool();
+        msgFrameSize = messagingProperties.getFrameSize();
+        channelFrameCount = messagingProperties.getFrameCount();
+    }
+
+    @Override
+    public IChannelReadInterface createReadInterface(IChannelControlBlock ccb) {
+        AppMessagingEmptyBufferAcceptor readEmptyBufferAcceptor = new AppMessagingEmptyBufferAcceptor();
+        MessagingChannelReadInterface readInterface = new MessagingChannelReadInterface(readEmptyBufferAcceptor);
+        readInterface.setBufferFactory(appMessagingBufferFactor, channelFrameCount, msgFrameSize);
+        readInterface.setFullBufferAcceptor(new AppMessagingReadFullBufferAcceptor(readEmptyBufferAcceptor));
+        return readInterface;
+    }
+
+    @Override
+    public IChannelWriteInterface createWriteInterface(IChannelControlBlock ccb) {
+        MessagingChannelWriteInterface writeInterface = new MessagingChannelWriteInterface(ccb);
+        writeInterface.setBufferFactory(appMessagingBufferFactor, channelFrameCount, msgFrameSize);
+        writeInterface.setEmptyBufferAcceptor(new AppMessagingEmptyBufferAcceptor());
+        return writeInterface;
+    }
+
+    /**
+     * A buffer factory based on {@link ConcurrentFramePool}. Used
+     * for messaging channels buffers.
+     */
+    private final class AppMessagingBufferFactory implements IBufferFactory {
+        private final FrameAction frameAction = new FrameAction();
+
+        @Override
+        public ByteBuffer createBuffer() throws HyracksDataException {
+            ByteBuffer buffer = messagingFramePool.get();
+            if (buffer == null) {
+                try {
+                    messagingFramePool.subscribe(frameAction);
+                    buffer = frameAction.retrieve();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            return buffer;
+        }
+    }
+
+    /**
+     * A buffer acceptor that receives the read buffers containing messages from
+     * other nodes.
+     */
+    private class AppMessagingReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        private final IBufferAcceptor recycle;
+
+        private AppMessagingReadFullBufferAcceptor(IBufferAcceptor recycle) {
+            this.recycle = recycle;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            try {
+                IApplicationMessage receivedMsg = (IApplicationMessage) JavaSerializationUtils
+                        .deserialize(buffer.array());
+                // Queue the received message and free the network IO thread
+                messageBroker.queueReceivedMessage(receivedMsg);
+            } catch (ClassNotFoundException | IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, e.getMessage(), e);
+                }
+            } finally {
+                recycle.accept(buffer);
+            }
+        }
+
+        @Override
+        public void close() {
+            // Nothing to close
+        }
+
+        @Override
+        public void error(int ecode) {
+            // Errors are handled via messages
+        }
+    }
+
+    /**
+     * Empty buffer acceptor used to return the used buffers in app messaging
+     * to the buffer pool.
+     */
+    private class AppMessagingEmptyBufferAcceptor implements IBufferAcceptor {
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            try {
+                messagingFramePool.release(buffer);
+            } catch (HyracksDataException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, e.getMessage(), e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
new file mode 100644
index 0000000..42ec795
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface;
+
+public class MessagingChannelReadInterface extends AbstractChannelReadInterface {
+
+    MessagingChannelReadInterface(IBufferAcceptor emptyBufferAcceptor) {
+        this.emptyBufferAcceptor = emptyBufferAcceptor;
+    }
+
+    @Override
+    public int read(SocketChannel sc, int size) throws IOException, NetException {
+        while (true) {
+            if (size <= 0) {
+                return size;
+            }
+            if (currentReadBuffer == null) {
+                currentReadBuffer = bufferFactory.createBuffer();
+            }
+            int rSize = Math.min(size, currentReadBuffer.remaining());
+            if (rSize > 0) {
+                currentReadBuffer.limit(currentReadBuffer.position() + rSize);
+                int len;
+                len = sc.read(currentReadBuffer);
+                if (len < 0) {
+                    throw new NetException("Socket Closed");
+                }
+                size -= len;
+                if (len < rSize) {
+                    return size;
+                }
+            } else {
+                return size;
+            }
+
+            if (currentReadBuffer.remaining() <= 0) {
+                flush();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
new file mode 100644
index 0000000..43c1542
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IConnectionWriterState;
+import org.apache.hyracks.api.comm.MuxDemuxCommand;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface;
+
+public class MessagingChannelWriteInterface extends AbstractChannelWriteInterface {
+
+    MessagingChannelWriteInterface(IChannelControlBlock ccb) {
+        super(ccb);
+    }
+
+    @Override
+    public void write(IConnectionWriterState writerState) throws NetException {
+        if (currentWriteBuffer == null) {
+            currentWriteBuffer = wiFullQueue.poll();
+        }
+        if (currentWriteBuffer != null) {
+            int size = currentWriteBuffer.remaining();
+            if (size > 0) {
+                writerState.getCommand().setChannelId(channelId);
+                writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.DATA);
+                writerState.getCommand().setData(size);
+                writerState.reset(currentWriteBuffer, size, ccb);
+            } else {
+                adjustChannelWritability();
+            }
+        } else if (ecode >= 0 && !ecodeSent) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+            writerState.getCommand().setData(ecode);
+            writerState.reset(null, 0, null);
+            ecodeSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        } else if (eos && !eosSent) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+            writerState.getCommand().setData(0);
+            writerState.reset(null, 0, null);
+            eosSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        }
+    }
+
+    @Override
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+        credits = limit * frameSize;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 74a5ba2..c7e4ac8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -18,8 +18,11 @@
  */
 package org.apache.asterix.messaging;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -27,6 +30,8 @@
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -46,43 +51,66 @@
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class NCMessageBroker implements INCMessageBroker {
-    private final static Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
 
     private final NodeControllerService ncs;
     private final AtomicLong messageId = new AtomicLong(0);
     private final Map<Long, IApplicationMessageCallback> callbacks;
     private final IAsterixAppRuntimeContext appContext;
+    private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
+    private final ConcurrentFramePool messagingFramePool;
+    private final int maxMsgSize;
 
-    public NCMessageBroker(NodeControllerService ncs) {
+    public NCMessageBroker(NodeControllerService ncs, MessagingProperties messagingProperties) {
         this.ncs = ncs;
         appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
-        callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+        callbacks = new ConcurrentHashMap<>();
+        maxMsgSize = messagingProperties.getFrameSize();
+        int messagingMemoryBudget = messagingProperties.getFrameSize() * messagingProperties.getFrameCount();
+        messagingFramePool = new ConcurrentFramePool(ncs.getId(), messagingMemoryBudget,
+                messagingProperties.getFrameSize());
+        receivedMsgsQ = new LinkedBlockingQueue<>();
+        MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
+        appContext.getThreadExecutor().execute(msgDeliverySvc);
     }
 
     @Override
-    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
-        if (callback != null) {
-            long uniqueMessageId = messageId.incrementAndGet();
-            message.setId(uniqueMessageId);
-            callbacks.put(uniqueMessageId, callback);
-        }
+    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+        registerMsgCallback(message, callback);
         try {
             ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
         } catch (Exception e) {
-            if (callback != null) {
-                //remove the callback in case of failure
-                callbacks.remove(message.getId());
-            }
+            handleMsgDeliveryFailure(message);
             throw e;
         }
     }
 
     @Override
+    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+            throws Exception {
+        registerMsgCallback(message, callback);
+        try {
+            IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
+            sendMessageToChannel(messagingChannel, message);
+        } catch (Exception e) {
+            handleMsgDeliveryFailure(message);
+            throw e;
+        }
+    }
+
+    @Override
+    public void queueReceivedMessage(IApplicationMessage msg) {
+        receivedMsgsQ.offer(msg);
+    }
+
+    @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
         try {
             AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
@@ -122,11 +150,46 @@
                     break;
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, e.getMessage(), e);
+            }
             throw e;
         }
     }
 
+    public ConcurrentFramePool getMessagingFramePool() {
+        return messagingFramePool;
+    }
+
+    private void registerMsgCallback(IApplicationMessage message, IApplicationMessageCallback callback) {
+        if (callback != null) {
+            long uniqueMessageId = messageId.incrementAndGet();
+            message.setId(uniqueMessageId);
+            callbacks.put(uniqueMessageId, callback);
+        }
+    }
+
+    private void handleMsgDeliveryFailure(IApplicationMessage message) {
+        callbacks.remove(message.getId());
+    }
+
+    private void sendMessageToChannel(IChannelControlBlock ccb, IApplicationMessage msg) throws IOException {
+        byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
+        if (serializedMsg.length > maxMsgSize) {
+            throw new HyracksDataException("Message exceded maximum size");
+        }
+        // Prepare the message buffer
+        ByteBuffer msgBuffer = messagingFramePool.get();
+        if (msgBuffer == null) {
+            throw new HyracksDataException("Could not get an empty buffer");
+        }
+        msgBuffer.clear();
+        msgBuffer.put(serializedMsg);
+        msgBuffer.flip();
+        // Give the buffer to the channel write interface for writing
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
+    }
+
     private void handleTakeoverPartitons(IMessage message) throws Exception {
         TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
         //if the NC is shutting down, it should ignore takeover partitions request
@@ -138,7 +201,7 @@
                 //send response after takeover is completed
                 TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
                         appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-                sendMessage(reponse, null);
+                sendMessageToCC(reponse, null);
             }
         }
     }
@@ -148,20 +211,19 @@
             appContext.initializeMetadata(false);
             appContext.exportMetadataNodeStub();
         } finally {
-            TakeoverMetadataNodeResponseMessage reponse =
-                    new TakeoverMetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId());
-            sendMessage(reponse, null);
+            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                    appContext.getTransactionSubsystem().getId());
+            sendMessageToCC(reponse, null);
         }
     }
 
-    @Override
     public void reportMaxResourceId() throws Exception {
         ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
         //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessage(maxResourceIdMsg, null);
+        sendMessageToCC(maxResourceIdMsg, null);
     }
 
     private void handleReplicaEvent(IMessage message) {
@@ -194,16 +256,16 @@
         }
 
         //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo =
-                (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+                .getLocalResourceRepository();
         for (Integer partitionId : msg.getPartitions()) {
             localResourceRepo.addInactivePartition(partitionId);
         }
 
         //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse =
-                new PreparePartitionsFailbackResponseMessage(msg.getPlanId(), msg.getRequestId(), msg.getPartitions());
-        sendMessage(reponse, null);
+        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
+                msg.getRequestId(), msg.getPartitions());
+        sendMessageToCC(reponse, null);
     }
 
     private void handleCompleteFailbackRequest(IMessage message) throws Exception {
@@ -212,9 +274,42 @@
             IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
             remoteRecoeryManager.completeFailbackProcess();
         } finally {
-            CompleteFailbackResponseMessage reponse =
-                    new CompleteFailbackResponseMessage(msg.getPlanId(), msg.getRequestId(), msg.getPartitions());
-            sendMessage(reponse, null);
+            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
+                    msg.getRequestId(), msg.getPartitions());
+            sendMessageToCC(reponse, null);
+        }
+    }
+
+    private class MessageDeliveryService implements Runnable {
+        /*
+         * TODO Currently this thread is not stopped when it is interrupted because
+         * NC2NC messaging might be used during nodes shutdown coordination and the
+         * JVM shutdown hook might interrupt while it is still needed. If NC2NC
+         * messaging wont be used during shutdown, then this thread needs to be
+         * gracefully stopped using a POSION_PILL or when interrupted during the
+         * shutdown.
+         */
+        @Override
+        public void run() {
+            while (true) {
+                IApplicationMessage msg = null;
+                try {
+                    msg = receivedMsgsQ.take();
+                    //TODO add nodeId to IApplicationMessage and pass it
+                    receivedMessage(msg, null);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
+                        LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
+                                + msg.getMessageType().name(), e);
+                    } else {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.log(Level.WARNING, "Could not process message", e);
+                        }
+                    }
+                }
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index cbd5c93..544f4ec 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,4 +98,16 @@
     <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
     </description>
   </property>
+  <property>
+    <name>messaging.frame.size</name>
+    <value>4096</value>
+    <description>The frame size to be used for NC to NC messaging. (Default = 4kb)
+    </description>
+  </property>
+  <property>
+    <name>messaging.frame.count</name>
+    <value>512</value>
+    <description>Number of reusable frames for NC to NC messaging. (Default = 512)
+    </description>
+  </property>
 </asterixConfiguration>
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ab511fb..bfdc834 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -257,6 +257,12 @@
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>2.0.2-beta</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
index e6f383f..43425ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
@@ -34,4 +34,6 @@
     AsterixBuildProperties getBuildProperties();
 
     public AsterixReplicationProperties getReplicationProperties();
+
+    public MessagingProperties getMessagingProperties();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MessagingProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MessagingProperties.java
new file mode 100644
index 0000000..6794c8d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MessagingProperties.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class MessagingProperties extends AbstractAsterixProperties {
+
+    private static final String MESSAGING_FRAME_SIZE_KEY = "messaging.frame.size";
+    private static final int MESSAGING_FRAME_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+    private static final String MESSAGING_FRAME_COUNT_KEY = "messaging.frame.count";
+    private static final int MESSAGING_BUFFER_COUNTE_DEFAULT = 512;
+
+    public MessagingProperties(AsterixPropertiesAccessor accessor) {
+        super(accessor);
+    }
+
+    public int getFrameSize() {
+        return accessor.getProperty(MESSAGING_FRAME_SIZE_KEY, MESSAGING_FRAME_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getFrameCount() {
+        return accessor.getProperty(MESSAGING_FRAME_COUNT_KEY, MESSAGING_BUFFER_COUNTE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/ConcurrentFramePool.java
similarity index 94%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/ConcurrentFramePool.java
index afe3b06..d57a4fc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ConcurrentFramePool.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/ConcurrentFramePool.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.active;
+package org.apache.asterix.common.memory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -25,16 +25,17 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
 
 public class ConcurrentFramePool {
     private static final boolean DEBUG = false;
-    private static final String ERROR_INVALID_FRAME_SIZE =
-            "The size should be an integral multiple of the default frame size";
-    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST =
-            "The requested frame size must not be greater than the allocated budget";
+    private static final String ERROR_INVALID_FRAME_SIZE = "The size should be an integral "
+            + "multiple of the default frame size";
+    private static final String ERROR_LARGER_THAN_BUDGET_REQUEST = "The requested frame size"
+            + " must not be greater than the allocated budget";
     private static final Logger LOGGER = Logger.getLogger(ConcurrentFramePool.class.getName());
     private final String nodeId;
     private final int budget;
@@ -232,7 +233,8 @@
                 try {
                     frameAction.call(freeBuffer);
                 } catch (Exception e) {
-                    LOGGER.error("Error while attempting to answer a subscription. Buffer will be reclaimed", e);
+                    LOGGER.log(Level.SEVERE,
+                            "Error while attempting to answer a subscription. Buffer will be reclaimed", e);
                     // TODO(amoudi): Add test cases and get rid of recursion
                     if (handedOut == handedOutBeforeCall) {
                         release(freeBuffer);
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/FrameAction.java
similarity index 96%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/FrameAction.java
index 849d360..366125a 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/FrameAction.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/memory/FrameAction.java
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.active;
+package org.apache.asterix.common.memory;
 
 import java.nio.ByteBuffer;
-
-import org.apache.log4j.Logger;
+import java.util.logging.Logger;
 
 public class FrameAction {
     private static final boolean DEBUG = false;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 9742a6c..5f08dd8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -44,7 +44,7 @@
 
     /**
      * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+     * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
      * when the callback is not null to notify the sender when the response to that message is received.
      *
      * @param messageId
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..f01d0c3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,12 +29,22 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
 
     /**
-     * Sends the maximum resource id on this NC to the CC.
+     * Sends application message from this NC to another NC.
      *
+     * @param message
+     * @param callback
      * @throws Exception
      */
-    public void reportMaxResourceId() throws Exception;
+    public void sendMessageToNC(String nodeId, IApplicationMessage message, IApplicationMessageCallback callback)
+            throws Exception;
+
+    /**
+     * Queue a message to this {@link INCMessageBroker} for processing
+     *
+     * @param msg
+     */
+    public void queueReceivedMessage(IApplicationMessage msg);
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
similarity index 92%
rename from asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
rename to asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
index 0f6a2ea..10c6b24 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/ConcurrentFramePoolUnitTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/memory/ConcurrentFramePoolUnitTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.feed.test;
+package org.apache.asterix.test.memory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -24,9 +24,9 @@
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingDeque;
 
-import org.apache.asterix.active.ConcurrentFramePool;
-import org.apache.asterix.active.FrameAction;
 import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.asterix.common.memory.FrameAction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.junit.Assert;
 import org.mockito.Mockito;
@@ -60,8 +60,8 @@
     public void testMemoryManager() {
         AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
         Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                DEFAULT_FRAME_SIZE);
         int i = 0;
         while (fmm.get() != null) {
             i++;
@@ -75,8 +75,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             FixedSizeAllocator[] runners = new FixedSizeAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new FixedSizeAllocator(fmm));
@@ -106,8 +106,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             Random random = new Random();
             int i = 0;
             int req;
@@ -141,8 +141,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
 
             VarSizeAllocator[] runners = new VarSizeAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
@@ -180,8 +180,8 @@
     public void testAcquireReleaseMemoryManager() throws HyracksDataException {
         AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
         Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-        ConcurrentFramePool fmm =
-                new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+        ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                DEFAULT_FRAME_SIZE);
         Random random = new Random();
         ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
         while (true) {
@@ -213,8 +213,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             FixedSizeGoodAllocator[] runners = new FixedSizeGoodAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new FixedSizeGoodAllocator(fmm));
@@ -244,8 +244,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             Random random = new Random();
             ArrayDeque<ByteBuffer> stack = new ArrayDeque<>();
             int i = 0;
@@ -297,8 +297,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             VarSizeGoodAllocator[] runners = new VarSizeGoodAllocator[NUM_THREADS];
             Thread[] threads = new Thread[NUM_THREADS];
             Arrays.parallelSetAll(runners, (int i) -> new VarSizeGoodAllocator(fmm));
@@ -333,8 +333,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             int i = 0;
             ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
             LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
@@ -399,8 +399,8 @@
         try {
             AsterixFeedProperties afp = Mockito.mock(AsterixFeedProperties.class);
             Mockito.when(afp.getMemoryComponentGlobalBudget()).thenReturn(FEED_MEM_BUDGET);
-            ConcurrentFramePool fmm =
-                    new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(), DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool fmm = new ConcurrentFramePool("TestNode", afp.getMemoryComponentGlobalBudget(),
+                    DEFAULT_FRAME_SIZE);
             int i = 0;
             ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
             LinkedBlockingDeque<ByteBuffer> buffers = new LinkedBlockingDeque<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index d4e3641..cd04515 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -24,8 +24,8 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ConcurrentFramePool;
-import org.apache.asterix.active.FrameAction;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.asterix.common.memory.FrameAction;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.util.FeedUtils.Mode;
@@ -73,11 +73,12 @@
             throws HyracksDataException {
         this.writer = writer;
 
-        this.spiller =
-                fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
+        this.spiller = fpa.spillToDiskOnCongestion()
+                ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
                                 + runtimeId.getRuntimeName() + "_" + runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk()) : null;
+                        fpa.getMaxSpillOnDisk())
+                : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index e643206..171d271 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -26,8 +26,8 @@
 import java.util.concurrent.Future;
 
 import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ConcurrentFramePool;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -114,11 +114,11 @@
             Random random = new Random();
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -156,11 +156,11 @@
             int numRounds = 10;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // No spill, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -205,14 +205,14 @@
             int totalMinFrames = 0;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
@@ -275,8 +275,8 @@
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer5);
                 numDiscarded++;
@@ -315,14 +315,14 @@
             int numberOfSpillFrames = 50;
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -345,8 +345,8 @@
             Assert.assertEquals(0, handler.getNumDiscarded());
             // We can only discard one frame
             double numDiscarded = 0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -394,8 +394,8 @@
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             // add NUM_FRAMES times
@@ -411,8 +411,8 @@
             }
             // Next call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(buffer);
                 numDiscarded++;
@@ -456,8 +456,8 @@
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
-            ConcurrentFramePool framePool =
-                    new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE);
+            ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE,
+                    DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
             handler.open();
             VSizeFrame frame = new VSizeFrame(ctx);
@@ -467,8 +467,8 @@
             }
             // Next 5 calls call should NOT block but should discard.
             double numDiscarded = 0.0;
-            boolean nextShouldDiscard =
-                    ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard();
+            boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa
+                    .getMaxFractionDiscard();
             while (nextShouldDiscard) {
                 handler.nextFrame(frame.getBuffer());
                 numDiscarded++;
@@ -507,8 +507,8 @@
         try {
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             // Spill budget = Memory budget, No discard
-            FeedPolicyAccessor fpa =
-                    createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+            FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
+                    DISCARD_ALLOWANCE);
             // Non-Active Writer
             TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
@@ -554,8 +554,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -595,8 +595,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer =
-                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
+            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(),
+                    false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -683,8 +683,8 @@
                 Random random = new Random();
                 IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
                 // Spill budget = Memory budget, No discard
-                FeedPolicyAccessor fpa =
-                        createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+                FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES,
+                        DISCARD_ALLOWANCE);
                 // Non-Active Writer
                 TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
                 writer.freeze();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index bf103fc..e7b15a2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -62,6 +63,7 @@
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
     private AsterixExtensionProperties extensionProperties;
+    private MessagingProperties messagingProperties;
     private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
     private final ILibraryManager libraryManager;
@@ -92,10 +94,11 @@
         INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
         INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor);
         INSTANCE.extensionProperties = new AsterixExtensionProperties(propertiesAccessor);
-        INSTANCE.replicationProperties =
-                new AsterixReplicationProperties(propertiesAccessor, AsterixClusterProperties.INSTANCE.getCluster());
+        INSTANCE.replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
+                AsterixClusterProperties.INSTANCE.getCluster());
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor);
+        INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor);
         Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
@@ -191,4 +194,9 @@
     public AsterixExtensionProperties getExtensionProperties() {
         return extensionProperties;
     }
+
+    @Override
+    public MessagingProperties getMessagingProperties() {
+        return messagingProperties;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this);
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new HyracksDataException(reponse.getException().getMessage());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
index 77934c6..b1aa45f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.api.application;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
@@ -77,4 +78,19 @@
      * @param handler
      */
     public void setStateDumpHandler(IStateDumpHandler handler);
+
+    /**
+     * Set the application MessagingChannelInterfaceFactory
+     *
+     * @param interfaceFactory
+     */
+    public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory);
+
+    /**
+     * Get the application MessagingChannelInterfaceFactory previously set by
+     * the {@link #setMessagingChannelInterfaceFactory(IChannelInterfaceFactory)} call.
+     *
+     * @return
+     */
+    public IChannelInterfaceFactory getMessagingChannelInterfaceFactory();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c41dafe..a79b955 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -33,12 +33,15 @@
 
     private final NetworkAddress datasetNetworkAddress;
 
+    private final NetworkAddress messagingNetworkAddress;
+
     public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress,
-            NetworkAddress datasetNetworkAddress) {
+            NetworkAddress datasetNetworkAddress, NetworkAddress messagingNetworkAddress) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
         this.datasetNetworkAddress = datasetNetworkAddress;
+        this.messagingNetworkAddress = messagingNetworkAddress;
     }
 
     public String getNodeId() {
@@ -56,4 +59,8 @@
     public NetworkAddress getDatasetNetworkAddress() {
         return datasetNetworkAddress;
     }
+
+    public NetworkAddress getMessagingNetworkAddress() {
+        return messagingNetworkAddress;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
similarity index 94%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
index d7febd2..585b6bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
@@ -25,6 +25,7 @@
  *
  * @author vinayakb
  */
+@FunctionalInterface
 public interface IBufferAcceptor {
     /**
      * Accept a buffer.
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
similarity index 82%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
index 53299d8..5b3a233 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -16,15 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
-/**
- * @author yingyib
- */
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
 public interface IBufferFactory {
 
-    public ByteBuffer createBuffer();
+    public ByteBuffer createBuffer() throws HyracksDataException;
 
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
new file mode 100644
index 0000000..02de858
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+public interface IChannelControlBlock {
+
+    /**
+     * Get the read interface of this channel.
+     *
+     * @return the read interface.
+     */
+    public IChannelReadInterface getReadInterface();
+
+    /**
+     * Get the write interface of this channel.
+     *
+     * @return the write interface.
+     */
+    public IChannelWriteInterface getWriteInterface();
+
+    /**
+     * Add write credit to this channel.
+     *
+     * @param delta
+     *            number of bytes
+     */
+    public void addWriteCredits(int delta);
+
+    /**
+     * @return The channel's unique id within its ChannelSet.
+     */
+    public int getChannelId();
+
+    /**
+     * Add pending credit.
+     *
+     * @param credit
+     */
+    public void addPendingCredits(int credit);
+
+    /**
+     * Increments the pending write operations of this channel.
+     */
+    public void markPendingWrite();
+
+    /**
+     * Clears the pending write operations of this channel.
+     */
+    public void unmarkPendingWrite();
+
+    /**
+     * Sets a flag indicating this channel was closed locally.
+     */
+    public void reportLocalEOS();
+
+    /**
+     * A flag indicating if the channel was closed on the remote side.
+     *
+     * @return
+     */
+    public boolean isRemotelyClosed();
+
+    /**
+     * Complete the current write operation on this channel.
+     */
+    public void writeComplete();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
similarity index 60%
copy from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
index 1697cfd..d147fc7 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelInterfaceFactory.java
@@ -16,25 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
-/**
- * A buffer acceptor that can be closed to indicate end of transmission or an error code
- * specified to indicate an error in transmission.
- *
- * @author vinayakb
- */
-public interface ICloseableBufferAcceptor extends IBufferAcceptor {
-    /**
-     * Close the buffer acceptor.
-     */
-    public void close();
+public interface IChannelInterfaceFactory {
 
     /**
-     * Indicate that an error occurred.
+     * Creates {@link IChannelReadInterface} and assigns the passed
+     * {@link IChannelControlBlock} to it.
      *
-     * @param ecode
-     *            - the error code.
+     * @param ccb
+     * @return
      */
-    public void error(int ecode);
+    public IChannelReadInterface createReadInterface(IChannelControlBlock ccb);
+
+    /**
+     * Creates {@link IChannelWriteInterface} and assigns the passed
+     * {@link IChannelControlBlock} to it.
+     *
+     * @param ccb
+     * @return
+     */
+    public IChannelWriteInterface createWriteInterface(IChannelControlBlock ccb);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
similarity index 61%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 8639fb7..357d761 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -16,13 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
 
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.exceptions.NetException;
 
 /**
- * Represents the read interface of a {@link ChannelControlBlock}.
+ * Represents the read interface of a {@link IChannelControlBlock}.
  *
  * @author vinayakb
  */
@@ -56,4 +58,42 @@
      *            - the size of each buffer
      */
     public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
+
+    /**
+     * Try to read as much as {@code size} bytes from {@code sc}
+     *
+     * @param sc
+     * @param size
+     * @return The number of read bytes.
+     * @throws IOException
+     * @throws NetException
+     */
+    public int read(SocketChannel sc, int size) throws IOException, NetException;
+
+    /**
+     * Sets the read credits of this {@link IChannelReadInterface}
+     *
+     * @param credits
+     */
+    public void setReadCredits(int credits);
+
+    /**
+     * @return The current read credits of this {@link IChannelReadInterface}
+     */
+    public int getCredits();
+
+    /**
+     * Forces the current read buffer to be flushed
+     */
+    public void flush();
+
+    /**
+     * @return The current full buffer acceptor of this {@link IChannelReadInterface}
+     */
+    public ICloseableBufferAcceptor getFullBufferAcceptor();
+
+    /**
+     * @return The buffer factory used by this {@link IChannelReadInterface}
+     */
+    public IBufferFactory getBufferFactory();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
similarity index 64%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
index dc38ea4..993dd2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
@@ -16,15 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
 
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.exceptions.NetException;
 
 /**
- * Represents the write interface of a {@link ChannelControlBlock}.
- *
- * @author vinayakb
+ * Represents the write interface of a {@link IChannelControlBlock}.
  */
 public interface IChannelWriteInterface {
     /**
@@ -56,4 +53,36 @@
      *            - the size of each buffer
      */
     public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
+
+    /**
+     * Performs a pending write operation based on the current state of
+     * this {@link IChannelWriteInterface}
+     *
+     * @param writerState
+     * @throws NetException
+     */
+    public void write(IConnectionWriterState writerState) throws NetException;
+
+    /**
+     * Completes the current write operation on this {@link IChannelWriteInterface}
+     */
+    public void writeComplete();
+
+    /**
+     * Add credits to this this {@link IChannelWriteInterface}
+     *
+     * @param credit
+     */
+    public void addCredits(int credit);
+
+    /**
+     * @return The current credits of this {@link IChannelWriteInterface}
+     */
+    public int getCredits();
+
+    /**
+     * Adjusts the {@link IChannelControlBlock} writability based on the current
+     * state of this {@link IChannelWriteInterface}
+     */
+    public void adjustChannelWritability();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
similarity index 96%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
index 1697cfd..fba0eaf 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
 /**
  * A buffer acceptor that can be closed to indicate end of transmission or an error code
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
index d7febd2..1f4f204 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
@@ -16,20 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
-/**
- * Accepts buffers.
- *
- * @author vinayakb
- */
-public interface IBufferAcceptor {
+public interface IConnectionWriterState {
+
     /**
-     * Accept a buffer.
+     * Resets the connection write state based on the passed parameters.
      *
-     * @param buffer
+     * @param pendingBuffer
+     * @param pendingWriteSize
+     * @param ccb
      */
-    public void accept(ByteBuffer buffer);
+    public void reset(ByteBuffer pendingBuffer, int pendingWriteSize, IChannelControlBlock ccb);
+
+    /**
+     * @return The command of this {@link IConnectionWriterState}
+     */
+    public MuxDemuxCommand getCommand();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
similarity index 87%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
index b53cecf..22b77f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
@@ -16,20 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.exceptions.NetException;
 
-class MuxDemuxCommand {
-    static final int MAX_CHANNEL_ID = Integer.MAX_VALUE - 1;
+public class MuxDemuxCommand {
+    public static final int MAX_CHANNEL_ID = Integer.MAX_VALUE - 1;
 
-    static final int COMMAND_SIZE = 8;
+    public static final int COMMAND_SIZE = 8;
 
-    static final int MAX_DATA_VALUE = 0x1fffffff;
+    public static final int MAX_DATA_VALUE = 0x1fffffff;
 
-    enum CommandType {
+    public enum CommandType {
         OPEN_CHANNEL,
         CLOSE_CHANNEL,
         CLOSE_CHANNEL_ACK,
@@ -90,4 +90,4 @@
     public String toString() {
         return channelId + ":" + type + ":" + data;
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/exceptions/NetException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
similarity index 93%
rename from hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/exceptions/NetException.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
index 659e6b5..eaf67af 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/exceptions/NetException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.exceptions;
+package org.apache.hyracks.api.exceptions;
 
 public class NetException extends Exception {
     private static final long serialVersionUID = 1L;
 
     public NetException() {
+        // empty constructor
     }
 
     public NetException(String message, Throwable cause) {
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index 6f42410..c238ae3 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -22,9 +22,10 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
@@ -38,7 +39,7 @@
         /* This is a connect only socket and does not listen to any incoming connections, so pass null to
          * localAddress and listener.
          */
-        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
+        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE);
     }
 
     public void start() throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fba7cf5..e3c6f4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -27,13 +27,13 @@
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class DatasetNetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
@@ -54,7 +54,7 @@
 
     private final int nBuffers;
 
-    private ChannelControlBlock ccb;
+    private IChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
index 2e66f54..fbcb0da 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
@@ -20,9 +20,9 @@
 
 import java.net.SocketAddress;
 
-import org.apache.hyracks.net.exceptions.NetException;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.exceptions.NetException;
 
 public interface IChannelConnectionFactory {
-    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
+    public IChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 235536f..a846da3 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -27,12 +27,12 @@
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
@@ -49,7 +49,7 @@
 
     private final int nBuffers;
 
-    private ChannelControlBlock ccb;
+    private IChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 
@@ -99,7 +99,8 @@
         }
         ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getInitialFrameSize());
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers,
+                ctx.getInitialFrameSize());
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(partitionId.getJobId().getId());
         writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 46220de..60e2e35 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -22,9 +22,9 @@
 import java.util.ArrayDeque;
 import java.util.Deque;
 
+import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 3aa77b9..876b3de 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -20,12 +20,9 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.net.protocols.muxdemux.IBufferFactory;
 
-/**
- * @author yingyib
- */
 public class ReadBufferFactory implements IBufferFactory {
 
     private final int limit;
@@ -39,17 +36,12 @@
 
     @Override
     public ByteBuffer createBuffer() {
-        try {
-            if (counter >= limit) {
-                return null;
-            } else {
-                ByteBuffer frame = ByteBuffer.allocate(frameSize);
-                counter++;
-                return frame;
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+        if (counter >= limit) {
+            return null;
+        } else {
+            ByteBuffer frame = ByteBuffer.allocate(frameSize);
+            counter++;
+            return frame;
         }
     }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index d93ba0d..ff5832a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -23,10 +23,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.base.INodeController;
@@ -35,6 +31,9 @@
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
@@ -47,6 +46,8 @@
 
     private final NetworkAddress datasetPort;
 
+    private final NetworkAddress messagingPort;
+
     private final Set<JobId> activeJobIds;
 
     private final String osName;
@@ -142,6 +143,7 @@
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
         datasetPort = reg.getDatasetPort();
+        messagingPort = reg.getMessagingPort();
         activeJobIds = new HashSet<JobId>();
 
         osName = reg.getOSName();
@@ -264,6 +266,10 @@
         return datasetPort;
     }
 
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
+
     public JSONObject toSummaryJSON() throws JSONException {
         JSONObject o = new JSONObject();
         o.put("node-id", ncConfig.nodeId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 785a202..726bf12 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -40,11 +40,11 @@
 
     @Override
     public void run() {
-        Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
+        Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
-                    .getValue().getDatasetPort()));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(),
+                    e.getValue().getDatasetPort(), e.getValue().getMessagingPort()));
         }
         callback.setValue(result);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index d08df60..64b9737 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.control.common.application.IniApplicationConfig;
 import org.ini4j.Ini;
@@ -25,12 +31,6 @@
 import org.kohsuke.args4j.Option;
 import org.kohsuke.args4j.spi.StopOptionHandler;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
-
 public class NCConfig implements Serializable {
     private static final long serialVersionUID = 2L;
 
@@ -82,7 +82,7 @@
     @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener (default: same as -result-port; must set -result-public-ip-address also)", required = false)
     public int resultPublicPort = 0;
 
-    @Option(name = "-retries", usage ="Number of attempts to contact CC before giving up (default = 5)")
+    @Option(name = "-retries", usage = "Number of attempts to contact CC before giving up (default = 5)")
     public int retries = 5;
 
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
@@ -112,6 +112,23 @@
     @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)", required = false)
     public String configFile = null;
 
+    //TODO add messaging values to NC start scripts
+    @Option(name = "-messaging-ip-address", usage = "IP Address to bind messaging "
+            + "listener (default: same as -address)", required = false)
+    public String messagingIPAddress;
+
+    @Option(name = "-messaging-port", usage = "IP port to bind messaging listener "
+            + "(default: random port)", required = false)
+    public int messagingPort = 0;
+
+    @Option(name = "-messaging-public-ip-address", usage = "Public IP Address to announce messaging"
+            + " listener (default: same as -messaging-ip-address)", required = false)
+    public String messagingPublicIPAddress;
+
+    @Option(name = "-messaging-public-port", usage = "Public IP port to announce messaging listener"
+            + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false)
+    public int messagingPublicPort = 0;
+
     @Argument
     @Option(name = "--", handler = StopOptionHandler.class)
     public List<String> appArgs;
@@ -139,13 +156,14 @@
         resultIPAddress = IniUtils.getString(ini, nodeSection, "result.address", resultIPAddress);
         resultPort = IniUtils.getInt(ini, nodeSection, "result.port", resultPort);
 
-        clusterNetPublicIPAddress = IniUtils.getString(
-                ini, nodeSection, "public.cluster.address", clusterNetPublicIPAddress);
+        clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.cluster.address",
+                clusterNetPublicIPAddress);
         clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, "public.cluster.port", clusterNetPublicPort);
         dataPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.data.address", dataPublicIPAddress);
         dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", dataPublicPort);
         resultPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.result.address", resultPublicIPAddress);
         resultPublicPort = IniUtils.getInt(ini, nodeSection, "public.result.port", resultPublicPort);
+        //TODO pass messaging info from ini file
 
         retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
 
@@ -169,23 +187,41 @@
         }
 
         // "address" is the default for all IP addresses
-        if (clusterNetIPAddress == null) clusterNetIPAddress = ipAddress;
-        if (dataIPAddress == null) dataIPAddress = ipAddress;
-        if (resultIPAddress == null) resultIPAddress = ipAddress;
+        if (clusterNetIPAddress == null) {
+            clusterNetIPAddress = ipAddress;
+        }
+        if (dataIPAddress == null) {
+            dataIPAddress = ipAddress;
+        }
+        if (resultIPAddress == null) {
+            resultIPAddress = ipAddress;
+        }
 
         // All "public" options default to their "non-public" versions
-        if (clusterNetPublicIPAddress == null) clusterNetPublicIPAddress = clusterNetIPAddress;
-        if (clusterNetPublicPort == 0) clusterNetPublicPort = clusterNetPort;
-        if (dataPublicIPAddress == null) dataPublicIPAddress = dataIPAddress;
-        if (dataPublicPort == 0) dataPublicPort = dataPort;
-        if (resultPublicIPAddress == null) resultPublicIPAddress = resultIPAddress;
-        if (resultPublicPort == 0) resultPublicPort = resultPort;
+        if (clusterNetPublicIPAddress == null) {
+            clusterNetPublicIPAddress = clusterNetIPAddress;
+        }
+        if (clusterNetPublicPort == 0) {
+            clusterNetPublicPort = clusterNetPort;
+        }
+        if (dataPublicIPAddress == null) {
+            dataPublicIPAddress = dataIPAddress;
+        }
+        if (dataPublicPort == 0) {
+            dataPublicPort = dataPort;
+        }
+        if (resultPublicIPAddress == null) {
+            resultPublicIPAddress = resultIPAddress;
+        }
+        if (resultPublicPort == 0) {
+            resultPublicPort = resultPort;
+        }
     }
 
     /**
      * @return An IApplicationConfig representing this NCConfig.
-     * Note: Currently this only includes the values from the configuration
-     * file, not anything specified on the command-line. QQQ
+     *         Note: Currently this only includes the values from the configuration
+     *         file, not anything specified on the command-line. QQQ
      */
     public IApplicationConfig getAppConfig() {
         return new IniApplicationConfig(ini);
@@ -215,10 +251,12 @@
         configuration.put("result-time-to-live", String.valueOf(resultTTL));
         configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
         configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
-
+        configuration.put("messaging-ip-address", messagingIPAddress);
+        configuration.put("messaging-port", String.valueOf(messagingPort));
+        configuration.put("messaging-public-ip-address", messagingPublicIPAddress);
+        configuration.put("messaging-public-port", String.valueOf(messagingPublicPort));
         if (appNCMainClass != null) {
             configuration.put("app-nc-main-class", appNCMainClass);
         }
-
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 5a23455..bb8022e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -65,10 +65,13 @@
 
     private final HeartbeatSchema hbSchema;
 
+    private final NetworkAddress messagingPort;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
-            List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+            List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema,
+            NetworkAddress messagingPort) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -87,6 +90,7 @@
         this.inputArguments = inputArguments;
         this.systemProperties = systemProperties;
         this.hbSchema = hbSchema;
+        this.messagingPort = messagingPort;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -160,4 +164,8 @@
     public Map<String, String> getSystemProperties() {
         return systemProperties;
     }
+
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index b015e3d..2d0a49d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -18,12 +18,12 @@
  */
 package org.apache.hyracks.control.nc;
 
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.kohsuke.args4j.CmdLineParser;
-
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.kohsuke.args4j.CmdLineParser;
+
 public class NCDriver {
     private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 43cac74..ac994a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,7 @@
 import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
+import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -91,6 +92,7 @@
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService implements IControllerService {
@@ -158,6 +160,8 @@
 
     private IIOCounter ioCounter;
 
+    private MessagingNetworkManager messagingNetManager;
+
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
@@ -171,7 +175,8 @@
         }
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
-                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+                FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
@@ -247,7 +252,12 @@
                 ncConfig.resultTTL, ncConfig.resultSweepThreshold);
         datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
                 datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
-                ncConfig.resultPublicPort);
+                ncConfig.resultPublicPort, FullFrameChannelInterfaceFactory.INSTANCE);
+        if (ncConfig.messagingIPAddress != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
+            messagingNetManager = new MessagingNetworkManager(this, ncConfig.messagingIPAddress, ncConfig.messagingPort,
+                    ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, ncConfig.messagingPublicPort,
+                    appCtx.getMessagingChannelInterfaceFactory());
+        }
     }
 
     @Override
@@ -260,8 +270,11 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle =
-                ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        if (messagingNetManager != null) {
+            messagingNetManager.start();
+        }
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
+                ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -271,11 +284,13 @@
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
+                : null;
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort));
 
         synchronized (this) {
             while (registrationPending) {
@@ -338,6 +353,9 @@
             datasetPartitionManager.close();
             netManager.stop();
             datasetNetworkManager.stop();
+            if (messagingNetManager != null) {
+                messagingNetManager.stop();
+            }
             queue.stop();
             if (ncAppEntryPoint != null) {
                 ncAppEntryPoint.stop();
@@ -490,8 +508,7 @@
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction amf =
-                            (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
@@ -516,8 +533,7 @@
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
-                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
@@ -530,8 +546,7 @@
                 }
 
                 case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
-                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }
@@ -572,6 +587,10 @@
         return datasetPartitionManager;
     }
 
+    public MessagingNetworkManager getMessagingNetworkManager() {
+        return messagingNetManager;
+    }
+
     /**
      * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index d23c701..a2c3f4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
@@ -42,10 +43,11 @@
     private Object appObject;
     private IStateDumpHandler sdh;
     private final NodeControllerService ncs;
+    private IChannelInterfaceFactory messagingChannelInterfaceFactory;
 
-    public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
-                                MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
-                                IApplicationConfig appConfig) throws IOException {
+    public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx,
+            String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
+            IApplicationConfig appConfig) throws IOException {
         super(serverCtx, appConfig);
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -108,4 +110,14 @@
     public IControllerService getControllerService() {
         return ncs;
     }
+
+    @Override
+    public IChannelInterfaceFactory getMessagingChannelInterfaceFactory() {
+        return messagingChannelInterfaceFactory;
+    }
+
+    @Override
+    public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory) {
+        this.messagingChannelInterfaceFactory = interfaceFactory;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
index 3c99ef3..0b74806 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -25,15 +25,16 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
@@ -58,19 +59,24 @@
     private NetworkAddress publicNetworkAddress;
 
     /**
-     * @param inetAddress - Internet address to bind the listen port to
-     * @param inetPort - Port to bind on inetAddress
-     * @param publicInetAddress - Internet address to report to consumers;
-     *    useful when behind NAT. null = same as inetAddress
-     * @param publicInetPort - Port to report to consumers; useful when
-     *    behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
+     * @param inetAddress
+     *            - Internet address to bind the listen port to
+     * @param inetPort
+     *            - Port to bind on inetAddress
+     * @param publicInetAddress
+     *            - Internet address to report to consumers;
+     *            useful when behind NAT. null = same as inetAddress
+     * @param publicInetPort
+     *            - Port to report to consumers; useful when
+     *            behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
      */
-    public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager, int nThreads,
-                                 int nBuffers, String publicInetAddress, int publicInetPort) throws IOException {
+    public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager,
+            int nThreads, int nBuffers, String publicInetAddress, int publicInetPort,
+            IChannelInterfaceFactory channelInterfaceFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
@@ -84,12 +90,10 @@
         // make it a copy of localNetworkAddress
         if (publicNetworkAddress.getAddress() == null) {
             publicNetworkAddress = localNetworkAddress;
-        }
-        else {
+        } else {
             // Likewise for public port
             if (publicNetworkAddress.getPort() == 0) {
-                publicNetworkAddress = new NetworkAddress
-                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
new file mode 100644
index 0000000..7983b93
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class MessagingNetworkManager {
+
+    private static final Logger LOGGER = Logger.getLogger(MessagingNetworkManager.class.getName());
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+    private final MuxDemux md;
+    private NetworkAddress localNetworkAddress;
+    private NetworkAddress publicNetworkAddress;
+    private final Map<String, IChannelControlBlock> ncChannels = new HashMap<>();
+    private final NodeControllerService ncs;
+    private final Map<IChannelControlBlock, ICloseableBufferAcceptor> channelFullBufferAcceptor = new HashMap<>();
+
+    public MessagingNetworkManager(NodeControllerService ncs, String inetAddress, int inetPort, int nThreads,
+            String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory) {
+        this.ncs = ncs;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        } else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
+    }
+
+    public void stop() {
+        // Currently there is nothing to stop
+    }
+
+    public IChannelControlBlock getMessagingChannel(String nodeId) throws Exception {
+        synchronized (ncChannels) {
+            IChannelControlBlock ccb = ncChannels.get(nodeId);
+            if (ccb == null) {
+                // Establish new connection
+                ccb = establishNewConnection(nodeId);
+                addOpenChannel(nodeId, ccb);
+            }
+            return ccb;
+        }
+    }
+
+    private ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
+    }
+
+    private void prepareMessagingInitialMessage(String ncId, final ByteBuffer buffer) throws NetException {
+        /*
+         * The messaging initial message contains the node id of the node
+         * which requested the channel to be opened.
+         */
+        int intialMsgLength = Integer.BYTES + ncId.length();
+        if (intialMsgLength > buffer.capacity()) {
+            throw new NetException("Initial message exceded the channel buffer size " + buffer.capacity() + " bytes");
+        }
+        buffer.clear();
+        buffer.putInt(ncId.length());
+        buffer.put(ncId.getBytes());
+        buffer.flip();
+    }
+
+    private IChannelControlBlock establishNewConnection(String nodeId) throws Exception {
+        Map<String, NodeControllerInfo> nodeControllers = ncs.getNodeControllersInfo();
+
+        // Get the node messaging address from its info
+        NodeControllerInfo nodeControllerInfo = nodeControllers.get(nodeId);
+        if (nodeControllerInfo == null) {
+            throw new NetException("Could not find node: " + nodeId);
+        }
+        NetworkAddress nodeMessagingNeAddress = nodeControllerInfo.getMessagingNetworkAddress();
+        SocketAddress nodeAddress = new InetSocketAddress(InetAddress.getByName(nodeMessagingNeAddress.getAddress()),
+                nodeMessagingNeAddress.getPort());
+
+        // Open the channel
+        IChannelControlBlock ccb = connect(nodeAddress);
+        try {
+            // Prepare the initial message buffer
+            ByteBuffer initialBuffer = ccb.getReadInterface().getBufferFactory().createBuffer();
+            prepareMessagingInitialMessage(ncs.getId(), initialBuffer);
+            // Send the initial messaging channel handshake message to register the opened channel on both nodes
+            ccb.getWriteInterface().getFullBufferAcceptor().accept(initialBuffer);
+            return ccb;
+        } catch (NetException e) {
+            closeChannel(ccb);
+            throw e;
+        }
+    }
+
+    private void addOpenChannel(final String nodeId, final IChannelControlBlock ccb) {
+        synchronized (ncChannels) {
+            if (ncChannels.get(nodeId) == null) {
+                ncChannels.put(nodeId, ccb);
+            } else {
+                closeChannel(ccb);
+                /*
+                 * TODO Currently there is a chance that two nodes will open
+                 * a channel to each other at the exact same time and both will
+                 * end up using a half closed channel. While this isn't a big issue,
+                 * it should be eliminated by introducing negotiation protocol
+                 * between nodes to decide which channel to use and which channel
+                 * to close fully.
+                 */
+            }
+        }
+    }
+
+    private void closeChannel(IChannelControlBlock ccb) {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            // Store the channel's original acceptor (which is set by the application)
+            ICloseableBufferAcceptor fullBufferAcceptor = channel.getReadInterface().getFullBufferAcceptor();
+            synchronized (channelFullBufferAcceptor) {
+                channelFullBufferAcceptor.put(channel, fullBufferAcceptor);
+            }
+            // Temporary set the acceptor to InitialBufferAcceptor to read the initial message
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            String nodeId = readMessagingInitialMessage(buffer);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Opened messaging channel with node: " + nodeId);
+            }
+            // Return the channel's original acceptor
+            ICloseableBufferAcceptor originalAcceptor;
+            synchronized (channelFullBufferAcceptor) {
+                originalAcceptor = channelFullBufferAcceptor.remove(ccb);
+                if (originalAcceptor == null) {
+                    throw new IllegalStateException("Could not find channel acceptor");
+                }
+            }
+            ccb.getReadInterface().setFullBufferAcceptor(originalAcceptor);
+            addOpenChannel(nodeId, ccb);
+        }
+
+        @Override
+        public void close() {
+            // Nothing to close
+        }
+
+        @Override
+        public void error(int ecode) {
+            // Errors should be handled in the application
+        }
+
+        private String readMessagingInitialMessage(ByteBuffer buffer) {
+            int nodeIdLength = buffer.getInt();
+            byte[] stringBytes = new byte[nodeIdLength];
+            buffer.get(stringBytes);
+            return new String(stringBytes);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 4b28aef..325966a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -25,16 +25,17 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
@@ -58,13 +59,13 @@
 
     private NetworkAddress publicNetworkAddress;
 
-    public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads, int nBuffers,
-                          String publicInetAddress, int publicInetPort)
-            throws IOException {
+    public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads,
+            int nBuffers, String publicInetAddress, int publicInetPort,
+            IChannelInterfaceFactory channelInterfaceFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
@@ -78,12 +79,10 @@
         // make it a copy of localNetworkAddress
         if (publicNetworkAddress.getAddress() == null) {
             publicNetworkAddress = localNetworkAddress;
-        }
-        else {
+        } else {
             // Likewise for public port
             if (publicNetworkAddress.getPort() == 0) {
-                publicNetworkAddress = new NetworkAddress
-                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
             }
         }
     }
@@ -100,6 +99,7 @@
 
     }
 
+    @Override
     public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
         MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
         return mConn.openChannel();
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 373fe21..13cb171 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -21,24 +21,22 @@
 
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 @SuppressWarnings("deprecation")
 public class SchedulerTest extends TestCase {
@@ -60,13 +58,8 @@
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -98,13 +91,17 @@
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        int dataPort = 5099;
+        int resultPort = 5098;
+        int messagingPort = 5097;
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
+                dataPort, resultPort, messagingPort);
+        ncNameToNcInfos.put("nc7",
+                new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort)));
+        ncNameToNcInfos.put("nc12",
+                new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -115,7 +112,8 @@
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
         fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
-        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
+        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0,
+                new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
         fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
         fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
@@ -123,8 +121,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
-                "nc7", "nc7", "nc12" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12", "nc7",
+                "nc7", "nc12" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -145,13 +143,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -191,13 +184,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -239,13 +227,8 @@
      * @throws Exception
      */
     public void testSchedulercBoundary() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         /** test empty file splits */
         InputSplit[] fileSplits = new InputSplit[0];
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
index c2ad4a0..8755cf3 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
@@ -22,6 +22,12 @@
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
 
 public class TestUtils {
 
@@ -38,8 +44,8 @@
                     throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
                 }
                 if (!equalStrings(lineExpected, lineActual)) {
-                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                            + lineActual);
+                    throw new Exception(
+                            "Result for changed at line " + num + ":\n< " + lineExpected + "\n> " + lineActual);
                 }
                 ++num;
             }
@@ -94,4 +100,16 @@
         return true;
     }
 
+    public static Map<String, NodeControllerInfo> generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
+            String addressPrefix, int netPort, int dataPort, int messagingPort) {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        for (int i = 1; i <= numberOfNodes; i++) {
+            String ncId = ncNamePrefix + i;
+            String ncAddress = addressPrefix + i;
+            ncNameToNcInfos.put(ncId,
+                    new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+                            new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort)));
+        }
+        return ncNameToNcInfos;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
index 6eabb71..793e029 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -19,22 +19,18 @@
 
 package org.apache.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 /**
  * Test case for the new HDFS API scheduler
@@ -47,13 +43,8 @@
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -79,13 +70,8 @@
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -118,13 +104,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -157,13 +138,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index a41b7e5..46494c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -42,6 +42,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
new file mode 100644
index 0000000..ff8d451
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+
+public abstract class AbstractChannelReadInterface implements IChannelReadInterface {
+
+    protected ICloseableBufferAcceptor fba;
+    protected IBufferAcceptor emptyBufferAcceptor;
+    protected ByteBuffer currentReadBuffer;
+    protected IBufferFactory bufferFactory;
+    protected volatile int credits;
+
+    @Override
+    public void flush() {
+        if (currentReadBuffer != null) {
+            currentReadBuffer.flip();
+            fba.accept(currentReadBuffer);
+            currentReadBuffer = null;
+        }
+    }
+
+    public void reportError(int ecode) {
+        fba.error(ecode);
+    }
+
+    @Override
+    public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
+        fba = fullBufferAcceptor;
+    }
+
+    @Override
+    public IBufferAcceptor getEmptyBufferAcceptor() {
+        return emptyBufferAcceptor;
+    }
+
+    @Override
+    public ICloseableBufferAcceptor getFullBufferAcceptor() {
+        return fba;
+    }
+
+    @Override
+    public int getCredits() {
+        return credits;
+    }
+
+    @Override
+    public void setReadCredits(int credits) {
+        this.credits = credits;
+    }
+
+    @Override
+    public IBufferFactory getBufferFactory() {
+        return bufferFactory;
+    }
+
+    @Override
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+        this.bufferFactory = bufferFactory;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
new file mode 100644
index 0000000..e50ffd2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+
+public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
+
+    private static final Logger LOGGER = Logger.getLogger(AbstractChannelWriteInterface.class.getName());
+    protected final IChannelControlBlock ccb;
+    protected final Queue<ByteBuffer> wiFullQueue;
+    protected boolean channelWritabilityState;
+    protected final int channelId;
+    protected IBufferAcceptor eba;
+    protected int credits;
+    protected boolean eos;
+    protected boolean eosSent;
+    protected int ecode;
+    protected boolean ecodeSent;
+    protected ByteBuffer currentWriteBuffer;
+    private final ICloseableBufferAcceptor fba;
+
+    public AbstractChannelWriteInterface(IChannelControlBlock ccb) {
+        this.ccb = ccb;
+        this.channelId = ccb.getChannelId();
+        wiFullQueue = new ArrayDeque<>();
+        fba = new CloseableBufferAcceptor();
+        credits = 0;
+        eos = false;
+        eosSent = false;
+        ecode = -1;
+        ecodeSent = false;
+    }
+
+    @Override
+    public void writeComplete() {
+        if (currentWriteBuffer.remaining() <= 0) {
+            currentWriteBuffer.clear();
+            eba.accept(currentWriteBuffer);
+            currentWriteBuffer = null;
+            adjustChannelWritability();
+        }
+    }
+
+    private boolean computeWritability() {
+        boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+        if (writableDataPresent) {
+            return credits > 0;
+        }
+        if (eos && !eosSent) {
+            return true;
+        }
+        if (ecode >= 0 && !ecodeSent) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void adjustChannelWritability() {
+        boolean writable = computeWritability();
+        if (writable) {
+            if (!channelWritabilityState) {
+                ccb.markPendingWrite();
+            }
+        } else {
+            if (channelWritabilityState) {
+                ccb.unmarkPendingWrite();
+            }
+        }
+        channelWritabilityState = writable;
+    }
+
+    @Override
+    public void addCredits(int credit) {
+        credits += credit;
+    }
+
+    @Override
+    public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
+        eba = emptyBufferAcceptor;
+    }
+
+    @Override
+    public ICloseableBufferAcceptor getFullBufferAcceptor() {
+        return fba;
+    }
+
+    @Override
+    public int getCredits() {
+        return credits;
+    }
+
+    private class CloseableBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            synchronized (ccb) {
+                wiFullQueue.add(buffer);
+                adjustChannelWritability();
+            }
+        }
+
+        @Override
+        public void close() {
+            synchronized (ccb) {
+                if (eos) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Received duplicate close() on channel: " + ccb.getChannelId());
+                    }
+                    return;
+                }
+                eos = true;
+                adjustChannelWritability();
+            }
+        }
+
+        @Override
+        public void error(int ecode) {
+            synchronized (ccb) {
+                AbstractChannelWriteInterface.this.ecode = ecode;
+                adjustChannelWritability();
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 6d82297..ba463d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -19,34 +19,29 @@
 package org.apache.hyracks.net.protocols.muxdemux;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
 
 /**
  * Handle to a channel that represents a logical full-duplex communication end-point.
  *
  * @author vinayakb
  */
-public class ChannelControlBlock {
-    private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
-
+public class ChannelControlBlock implements IChannelControlBlock {
     private final ChannelSet cSet;
 
     private final int channelId;
 
-    private final ReadInterface ri;
+    private final IChannelReadInterface ri;
 
-    private final WriteInterface wi;
+    private final IChannelWriteInterface wi;
 
     private final AtomicBoolean localClose;
 
@@ -56,298 +51,38 @@
 
     private final AtomicBoolean remoteCloseAck;
 
-    ChannelControlBlock(ChannelSet cSet, int channelId) {
+    ChannelControlBlock(ChannelSet cSet, int channelId, IChannelInterfaceFactory interfaceFactory) {
         this.cSet = cSet;
         this.channelId = channelId;
-        this.ri = new ReadInterface();
-        this.wi = new WriteInterface();
         localClose = new AtomicBoolean();
         localCloseAck = new AtomicBoolean();
         remoteClose = new AtomicBoolean();
         remoteCloseAck = new AtomicBoolean();
+        this.ri = interfaceFactory.createReadInterface(this);
+        this.wi = interfaceFactory.createWriteInterface(this);
     }
 
-    int getChannelId() {
+    @Override
+    public int getChannelId() {
         return channelId;
     }
 
-    /**
-     * Get the read inderface of this channel.
-     *
-     * @return the read interface.
-     */
+    @Override
     public IChannelReadInterface getReadInterface() {
         return ri;
     }
 
-    /**
-     * Get the write interface of this channel.
-     *
-     * @return the write interface.
-     */
+    @Override
     public IChannelWriteInterface getWriteInterface() {
         return wi;
     }
 
-    private final class ReadInterface implements IChannelReadInterface {
-        private final Deque<ByteBuffer> riEmptyStack;
-
-        private final IBufferAcceptor eba = new IBufferAcceptor() {
-            @Override
-            public void accept(ByteBuffer buffer) {
-                int delta = buffer.remaining();
-                synchronized (ChannelControlBlock.this) {
-                    if (remoteClose.get()) {
-                        return;
-                    }
-                    riEmptyStack.push(buffer);
-                }
-                cSet.addPendingCredits(channelId, delta);
-            }
-        };
-
-        private ICloseableBufferAcceptor fba;
-
-        private volatile int credits;
-
-        private ByteBuffer currentReadBuffer;
-
-        private IBufferFactory bufferFactory;
-
-        ReadInterface() {
-            riEmptyStack = new ArrayDeque<ByteBuffer>();
-            credits = 0;
-        }
-
-        @Override
-        public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
-            this.bufferFactory = bufferFactory;
-            cSet.addPendingCredits(channelId, limit * frameSize);
-        }
-
-        @Override
-        public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
-            fba = fullBufferAcceptor;
-        }
-
-        @Override
-        public IBufferAcceptor getEmptyBufferAcceptor() {
-            return eba;
-        }
-
-        int read(SocketChannel sc, int size) throws IOException, NetException {
-            while (true) {
-                if (size <= 0) {
-                    return size;
-                }
-                if (currentReadBuffer == null) {
-                    currentReadBuffer = riEmptyStack.poll();
-                    //if current buffer == null and limit not reached
-                    // factory.createBuffer factory
-                    if (currentReadBuffer == null) {
-                        currentReadBuffer = bufferFactory.createBuffer();
-                    }
-                    assert currentReadBuffer != null;
-                }
-                int rSize = Math.min(size, currentReadBuffer.remaining());
-                if (rSize > 0) {
-                    currentReadBuffer.limit(currentReadBuffer.position() + rSize);
-                    int len;
-                    try {
-                        len = sc.read(currentReadBuffer);
-                        if (len < 0) {
-                            throw new NetException("Socket Closed");
-                        }
-                    } finally {
-                        currentReadBuffer.limit(currentReadBuffer.capacity());
-                    }
-                    size -= len;
-                    if (len < rSize) {
-                        return size;
-                    }
-                } else {
-                    return size;
-                }
-                if (currentReadBuffer.remaining() <= 0) {
-                    flush();
-                }
-            }
-        }
-
-        void flush() {
-            if (currentReadBuffer != null) {
-                currentReadBuffer.flip();
-                fba.accept(currentReadBuffer);
-                currentReadBuffer = null;
-            }
-        }
-    }
-
-    private final class WriteInterface implements IChannelWriteInterface {
-        private final Queue<ByteBuffer> wiFullQueue;
-
-        private boolean channelWritabilityState;
-
-        private IBufferFactory bufferFactory;
-
-        private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
-            @Override
-            public void accept(ByteBuffer buffer) {
-                synchronized (ChannelControlBlock.this) {
-                    wiFullQueue.add(buffer);
-                    adjustChannelWritability();
-                }
-            }
-
-            @Override
-            public void close() {
-                synchronized (ChannelControlBlock.this) {
-                    if (eos) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Received duplicate close() on channel: " + channelId);
-                        }
-                        return;
-                    }
-                    eos = true;
-                    adjustChannelWritability();
-                }
-            }
-
-            @Override
-            public void error(int ecode) {
-                synchronized (ChannelControlBlock.this) {
-                    WriteInterface.this.ecode = ecode;
-                    adjustChannelWritability();
-                }
-            }
-        };
-
-        private IBufferAcceptor eba;
-
-        private int credits;
-
-        private boolean eos;
-
-        private boolean eosSent;
-
-        private int ecode;
-
-        private boolean ecodeSent;
-
-        private ByteBuffer currentWriteBuffer;
-
-        WriteInterface() {
-            wiFullQueue = new ArrayDeque<ByteBuffer>();
-            credits = 0;
-            eos = false;
-            eosSent = false;
-            ecode = -1;
-            ecodeSent = false;
-        }
-
-        @Override
-        public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
-            this.bufferFactory = bufferFactory;
-            if (!channelWritabilityState) {
-                cSet.markPendingWrite(channelId);
-            }
-            channelWritabilityState = true;
-            if (eos) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Received duplicate close() on channel: " + channelId);
-                }
-                return;
-            }
-            eos = true;
-        }
-
-        @Override
-        public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
-            eba = emptyBufferAcceptor;
-        }
-
-        @Override
-        public ICloseableBufferAcceptor getFullBufferAcceptor() {
-            return fba;
-        }
-
-        void write(MultiplexedConnection.WriterState writerState) throws NetException {
-            if (currentWriteBuffer == null) {
-                currentWriteBuffer = wiFullQueue.poll();
-            }
-            if (currentWriteBuffer != null) {
-                int size = Math.min(currentWriteBuffer.remaining(), credits);
-                if (size > 0) {
-                    credits -= size;
-                    writerState.command.setChannelId(channelId);
-                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
-                    writerState.command.setData(size);
-                    writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
-                } else {
-                    adjustChannelWritability();
-                }
-            } else if (ecode >= 0 && !ecodeSent) {
-                writerState.command.setChannelId(channelId);
-                writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
-                writerState.command.setData(ecode);
-                writerState.reset(null, 0, null);
-                ecodeSent = true;
-                localClose.set(true);
-                adjustChannelWritability();
-            } else if (eos && !eosSent) {
-                writerState.command.setChannelId(channelId);
-                writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
-                writerState.command.setData(0);
-                writerState.reset(null, 0, null);
-                eosSent = true;
-                localClose.set(true);
-                adjustChannelWritability();
-            }
-        }
-
-        void writeComplete() {
-            if (currentWriteBuffer.remaining() <= 0) {
-                currentWriteBuffer.clear();
-                eba.accept(currentWriteBuffer);
-                currentWriteBuffer = null;
-                adjustChannelWritability();
-            }
-        }
-
-        private boolean computeWritability() {
-            boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
-            if (writableDataPresent) {
-                return credits > 0;
-            }
-            if (eos && !eosSent) {
-                return true;
-            }
-            if (ecode >= 0 && !ecodeSent) {
-                return true;
-            }
-            return false;
-        }
-
-        void adjustChannelWritability() {
-            boolean writable = computeWritability();
-            if (writable) {
-                if (!channelWritabilityState) {
-                    cSet.markPendingWrite(channelId);
-                }
-            } else {
-                if (channelWritabilityState) {
-                    cSet.unmarkPendingWrite(channelId);
-                }
-            }
-            channelWritabilityState = writable;
-        }
-    }
-
-    synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
+    synchronized void write(WriterState writerState) throws NetException {
         wi.write(writerState);
     }
 
-    synchronized void writeComplete() {
+    @Override
+    public synchronized void writeComplete() {
         wi.writeComplete();
     }
 
@@ -356,21 +91,22 @@
     }
 
     int getReadCredits() {
-        return ri.credits;
+        return ri.getCredits();
     }
 
     void setReadCredits(int credits) {
-        this.ri.credits = credits;
+        ri.setReadCredits(credits);
     }
 
-    synchronized void addWriteCredits(int delta) {
-        wi.credits += delta;
+    @Override
+    public synchronized void addWriteCredits(int delta) {
+        wi.addCredits(delta);
         wi.adjustChannelWritability();
     }
 
     synchronized void reportRemoteEOS() {
         ri.flush();
-        ri.fba.close();
+        ri.getFullBufferAcceptor().close();
         remoteClose.set(true);
     }
 
@@ -388,7 +124,7 @@
 
     synchronized void reportRemoteError(int ecode) {
         ri.flush();
-        ri.fba.error(ecode);
+        ri.getFullBufferAcceptor().error(ecode);
         remoteClose.set(true);
     }
 
@@ -397,9 +133,34 @@
     }
 
     @Override
+    public boolean isRemotelyClosed() {
+        return remoteCloseAck.get();
+    }
+
+    @Override
+    public void reportLocalEOS() {
+        localClose.set(true);
+    }
+
+    @Override
+    public void addPendingCredits(int credit) {
+        cSet.addPendingCredits(channelId, credit);
+    }
+
+    @Override
+    public void unmarkPendingWrite() {
+        cSet.unmarkPendingWrite(channelId);
+    }
+
+    @Override
+    public void markPendingWrite() {
+        cSet.markPendingWrite(channelId);
+    }
+
+    @Override
     public String toString() {
         return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
-                + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: " + ri.credits
-                + " writeCredits: " + wi.credits + "]";
+                + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: "
+                + ri.getCredits() + " writeCredits: " + wi.getCredits() + "]";
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index 52f3417..49bb292 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -23,7 +23,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.MuxDemuxCommand;
+import org.apache.hyracks.api.exceptions.NetException;
 
 public class ChannelSet {
     private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
@@ -224,7 +226,8 @@
         }
         assert idx < ccbArray.length;
         assert !allocationBitmap.get(idx);
-        ChannelControlBlock channel = new ChannelControlBlock(this, idx);
+        IChannelInterfaceFactory channelInterfaceFactory = mConn.getChannelInterfaceFactory();
+        ChannelControlBlock channel = new ChannelControlBlock(this, idx, channelInterfaceFactory);
         ccbArray[idx] = channel;
         allocationBitmap.set(idx);
         ++openChannelCount;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelInterfaceFactory.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelInterfaceFactory.java
new file mode 100644
index 0000000..459fe51
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelInterfaceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+
+public class FullFrameChannelInterfaceFactory implements IChannelInterfaceFactory {
+
+    public static final IChannelInterfaceFactory INSTANCE = new FullFrameChannelInterfaceFactory();
+
+    @Override
+    public IChannelReadInterface createReadInterface(IChannelControlBlock cbb) {
+        return new FullFrameChannelReadInterface(cbb);
+    }
+
+    @Override
+    public IChannelWriteInterface createWriteInterface(IChannelControlBlock cbb) {
+        return new FullFrameChannelWriteInterface(cbb);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
new file mode 100644
index 0000000..432382a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.exceptions.NetException;
+
+public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
+
+    private final Deque<ByteBuffer> riEmptyStack;
+    private final IChannelControlBlock ccb;
+
+    FullFrameChannelReadInterface(IChannelControlBlock ccb) {
+        this.ccb = ccb;
+        riEmptyStack = new ArrayDeque<>();
+        credits = 0;
+
+        emptyBufferAcceptor = buffer -> {
+            int delta = buffer.remaining();
+            synchronized (ccb) {
+                if (ccb.isRemotelyClosed()) {
+                    return;
+                }
+                riEmptyStack.push(buffer);
+            }
+            ccb.addPendingCredits(delta);
+        };
+    }
+
+    @Override
+    public int read(SocketChannel sc, int size) throws IOException, NetException {
+        while (true) {
+            if (size <= 0) {
+                return size;
+            }
+            if (currentReadBuffer == null) {
+                currentReadBuffer = riEmptyStack.poll();
+                //if current buffer == null and limit not reached
+                // factory.createBuffer factory
+                if (currentReadBuffer == null) {
+                    currentReadBuffer = bufferFactory.createBuffer();
+                }
+            }
+            int rSize = Math.min(size, currentReadBuffer.remaining());
+            if (rSize > 0) {
+                currentReadBuffer.limit(currentReadBuffer.position() + rSize);
+                int len;
+                try {
+                    len = sc.read(currentReadBuffer);
+                    if (len < 0) {
+                        throw new NetException("Socket Closed");
+                    }
+                } finally {
+                    currentReadBuffer.limit(currentReadBuffer.capacity());
+                }
+                size -= len;
+                if (len < rSize) {
+                    return size;
+                }
+            } else {
+                return size;
+            }
+            if (currentReadBuffer.remaining() <= 0) {
+                flush();
+            }
+        }
+    }
+
+    @Override
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+        super.setBufferFactory(bufferFactory, limit, frameSize);
+        ccb.addPendingCredits(limit * frameSize);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
new file mode 100644
index 0000000..418ebd7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.protocols.muxdemux;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IConnectionWriterState;
+import org.apache.hyracks.api.comm.MuxDemuxCommand;
+import org.apache.hyracks.api.exceptions.NetException;
+
+public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterface {
+
+    private static final Logger LOGGER = Logger.getLogger(FullFrameChannelWriteInterface.class.getName());
+
+    FullFrameChannelWriteInterface(IChannelControlBlock ccb) {
+        super(ccb);
+    }
+
+    @Override
+    public void write(IConnectionWriterState writerState) throws NetException {
+        if (currentWriteBuffer == null) {
+            currentWriteBuffer = wiFullQueue.poll();
+        }
+        if (currentWriteBuffer != null) {
+            int size = Math.min(currentWriteBuffer.remaining(), credits);
+            if (size > 0) {
+                credits -= size;
+                writerState.getCommand().setChannelId(channelId);
+                writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.DATA);
+                writerState.getCommand().setData(size);
+                writerState.reset(currentWriteBuffer, size, ccb);
+            } else {
+                adjustChannelWritability();
+            }
+        } else if (ecode >= 0 && !ecodeSent) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
+            writerState.getCommand().setData(ecode);
+            writerState.reset(null, 0, null);
+            ecodeSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        } else if (eos && !eosSent) {
+            writerState.getCommand().setChannelId(channelId);
+            writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
+            writerState.getCommand().setData(0);
+            writerState.reset(null, 0, null);
+            eosSent = true;
+            ccb.reportLocalEOS();
+            adjustChannelWritability();
+        }
+    }
+
+    @Override
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+        if (!channelWritabilityState) {
+            ccb.markPendingWrite();
+        }
+        channelWritabilityState = true;
+        if (eos) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Received duplicate close() on channel: " + channelId);
+            }
+            return;
+        }
+        eos = true;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 866f2c6..ad8aabd 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -26,7 +26,11 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.IConnectionWriterState;
+import org.apache.hyracks.api.comm.MuxDemuxCommand;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 
@@ -121,8 +125,8 @@
     }
 
     @Override
-    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
-            NetException {
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable)
+            throws IOException, NetException {
         if (readable) {
             driveReaderStateMachine();
         }
@@ -157,7 +161,7 @@
         return channel;
     }
 
-    class WriterState {
+    class WriterState implements IConnectionWriterState {
         private final ByteBuffer cmdWriteBuffer;
 
         final MuxDemuxCommand command;
@@ -166,7 +170,7 @@
 
         private int pendingWriteSize;
 
-        private ChannelControlBlock ccb;
+        private IChannelControlBlock ccb;
 
         public WriterState() {
             cmdWriteBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
@@ -179,7 +183,8 @@
             return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
         }
 
-        void reset(ByteBuffer pendingBuffer, int pendingWriteSize, ChannelControlBlock ccb) {
+        @Override
+        public void reset(ByteBuffer pendingBuffer, int pendingWriteSize, IChannelControlBlock ccb) {
             cmdWriteBuffer.clear();
             command.write(cmdWriteBuffer);
             cmdWriteBuffer.flip();
@@ -222,6 +227,11 @@
             }
             return true;
         }
+
+        @Override
+        public MuxDemuxCommand getCommand() {
+            return command;
+        }
     }
 
     void driveWriterStateMachine() throws IOException, NetException {
@@ -414,4 +424,8 @@
             readerState.reset();
         }
     }
-}
+
+    public IChannelInterfaceFactory getChannelInterfaceFactory() {
+        return muxDemux.getChannelInterfaceFactory();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 0ea7b13..7e6c8d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -23,7 +23,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -48,6 +49,8 @@
 
     private final MuxDemuxPerformanceCounters perfCounters;
 
+    private final IChannelInterfaceFactory channelInterfaceFatory;
+
     /**
      * Constructor.
      *
@@ -61,11 +64,12 @@
      *            - Maximum number of connection attempts
      */
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads,
-            int maxConnectionAttempts) {
+            int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFatory) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
         this.maxConnectionAttempts = maxConnectionAttempts;
-        connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+        this.channelInterfaceFatory = channelInterfaceFatory;
+        connectionMap = new HashMap<>();
         this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
             @Override
             public void connectionEstablished(TCPConnection connection) {
@@ -129,7 +133,7 @@
      * @throws NetException
      */
     public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
-        MultiplexedConnection mConn = null;
+        MultiplexedConnection mConn;
         synchronized (this) {
             mConn = connectionMap.get(remoteAddress);
             if (mConn == null) {
@@ -163,4 +167,14 @@
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return perfCounters;
     }
-}
+
+    /**
+     * Gets the channel interface factory associated with channels
+     * created by this {@link MuxDemux}.
+     *
+     * @return
+     */
+    public IChannelInterfaceFactory getChannelInterfaceFactory() {
+        return channelInterfaceFatory;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
index 59b1fbb..eaff865 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -20,11 +20,11 @@
 
 import java.io.IOException;
 
-import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.api.exceptions.NetException;
 
 public interface ITCPConnectionEventListener {
-    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
-            NetException;
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable)
+            throws IOException, NetException;
 
     public void notifyIOError(Exception e);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index f1f3c4a..ab018db 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -25,16 +25,16 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import junit.framework.Assert;
-
-import org.junit.Test;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.junit.Test;
+
+import junit.framework.Assert;
 
 public class NetTest {
     @Test
@@ -162,7 +162,8 @@
                 }.start();
             }
         };
-        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5);
+        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5,
+                FullFrameChannelInterfaceFactory.INSTANCE);
     }
 
     private class ChannelIO {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
index c6a4430..e4470ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -140,4 +141,14 @@
     public IControllerService getControllerService() {
         return null;
     }
+
+    @Override
+    public IChannelInterfaceFactory getMessagingChannelInterfaceFactory() {
+        return null;
+    }
+
+    @Override
+    public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory) {
+        // do nothing
+    }
 }