diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..274d446 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -41,4 +41,8 @@
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
+
+    public void setSharedObject(Object sharedObject);
+
+    public Object getSharedObject();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index 81a0290..339eb9d 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -21,9 +21,6 @@
 import java.io.Serializable;
 import java.util.BitSet;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -33,6 +30,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.ActivityCluster;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * Connector that connects operators in a Job.
@@ -40,6 +39,7 @@
  * @author vinayakb
  */
 public interface IConnectorDescriptor extends Serializable {
+
     /**
      * Gets the id of the connector.
      *
@@ -68,7 +68,7 @@
      */
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException;
+                    throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..e99fea8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private Object sharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,14 @@
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
+    }
+
+    @Override
+    public Object getSharedObject() {
+        return sharedObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
@@ -46,7 +58,7 @@
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@
             System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
+                        tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..7100c11
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private final IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (tupleCount > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        }
+    }
+
+    public void appendMessage() {
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
+        tupleDataEndOffset += message.limit();
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..d2100bb 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the {@code value} in
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index cf4808f..7d97507 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public boolean allProducersToAllConsumers(){
+    public boolean allProducersToAllConsumers() {
         return true;
     }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7856d6a..44d77ac 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -41,8 +41,8 @@
 
     private ITuplePartitionComputerFactory tpcf;
 
-    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            ILocalityMap localityMap) {
+    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, ILocalityMap localityMap) {
         super(spec);
         this.localityMap = localityMap;
         this.tpcf = tpcf;
@@ -60,7 +60,7 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
                 nConsumerPartitions, localityMap, index);
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..d5e4e20 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
-        return hashWriter;
+                    throws HyracksDataException {
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -61,4 +59,8 @@
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {
+        return tpcf;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..e90d8b0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index cfa0cf9..dde29c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -48,13 +48,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -69,8 +69,8 @@
         OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
         OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
-        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
-                new PartitionCountExpression(producer)));
+        constraintAcceptor.addConstraint(
+                new Constraint(new PartitionCountExpression(consumer), new PartitionCountExpression(producer)));
     }
 
     @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..f84c3e4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -51,7 +51,7 @@
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = new FrameTupleAppender();
+                appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -61,6 +61,10 @@
         this.ctx = ctx;
     }
 
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new FrameTupleAppender();
+    }
+
     @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..4055fb0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter extends PartitionDataWriter {
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc);
+    }
+
+    @Override
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new MessagingFrameTupleAppender(ctx);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0960927..d121ec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
index daf5104..f495b75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -20,9 +20,9 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITupleParserFactory extends Serializable {
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException;
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
index eb2714f..4558cf9 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
@@ -27,7 +27,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -41,7 +41,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..9848ffb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -137,4 +137,13 @@
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public Object getSharedObject() {
+        return null;
+    }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+    }
 }
\ No newline at end of file
