Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: I56ae8124052c13a52ca42965b8d00e18ecf35a28
Reviewed-on: https://asterix-gerrit.ics.uci.edu/604
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
index eb6b888..473f3ae 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/FrameHelper.java
@@ -54,7 +54,7 @@
      * nbytes the actual data.
      * If the tupleLength includes the field slot, please set the fieldCount = 0
      */
-    public static int calcSpaceInFrame(int fieldCount, int tupleLength) {
+    public static int calcRequiredSpace(int fieldCount, int tupleLength) {
         return 4 + fieldCount * 4 + tupleLength;
     }
 
@@ -68,7 +68,7 @@
      */
     public static int calcAlignedFrameSizeToStore(int fieldCount, int tupleLength, int minFrameSize) {
         assert fieldCount >= 0 && tupleLength >= 0 && minFrameSize > 0;
-        return (1 + (calcSpaceInFrame(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
+        return (1 + (calcRequiredSpace(fieldCount, tupleLength) + FrameConstants.META_DATA_LEN - 1) / minFrameSize)
                 * minFrameSize;
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index fd1d376..274d446 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -41,4 +41,8 @@
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
+
+    public void setSharedObject(Object sharedObject);
+
+    public Object getSharedObject();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index 81a0290..339eb9d 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -21,9 +21,6 @@
 import java.io.Serializable;
 import java.util.BitSet;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -33,6 +30,8 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.ActivityCluster;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * Connector that connects operators in a Job.
@@ -40,6 +39,7 @@
  * @author vinayakb
  */
 public interface IConnectorDescriptor extends Serializable {
+
     /**
      * Gets the id of the connector.
      *
@@ -68,7 +68,7 @@
      */
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException;
+                    throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 61baf82..e99fea8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -95,6 +95,8 @@
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
+    private Object sharedObject;
+
     public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
             NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
@@ -383,4 +385,14 @@
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, deploymentId);
     }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
+    }
+
+    @Override
+    public Object getSharedObject() {
+        return sharedObject;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index fd71716..1553605 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -29,6 +29,18 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 
+/*
+ * Frame
+ *  _____________________________________________
+ * |[tuple1][tuple2][tuple3].........            |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |                      .                      |
+ * |..[tupleN][tuplesOffsets(4*N)][tupleCount(4)]|
+ * |_____________________________________________|
+ */
 public class AbstractFrameAppender implements IFrameAppender {
     protected IFrame frame;
     protected byte[] array; // cached the getBuffer().array to speed up byte array access a little
@@ -46,7 +58,7 @@
     }
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
-        return tupleDataEndOffset + FrameHelper.calcSpaceInFrame(fieldCount, tupleLength)
+        return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, tupleLength)
                 + tupleCount * FrameConstants.SIZE_LEN <= FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index c9c51d3..136e231 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -40,6 +40,10 @@
         reset(frame, clear);
     }
 
+    /**
+     * append fieldSlots and bytes to the current frame
+     */
+    @Override
     public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
             for (int i = 0; i < fieldSlots.length; ++i) {
@@ -50,27 +54,28 @@
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
             System.arraycopy(bytes, offset, getBuffer().array(), tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendSkipEmptyField(int[] fieldSlots, byte[] bytes, int offset, int length)
             throws HyracksDataException {
         if (canHoldNewTuple(fieldSlots.length, length)) {
@@ -83,17 +88,16 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + effectiveSlots * 4, length);
             tupleDataEndOffset += effectiveSlots * 4 + length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
             throws HyracksDataException {
         int length = tEndOffset - tStartOffset;
@@ -101,25 +105,25 @@
             ByteBuffer src = tupleAccessor.getBuffer();
             System.arraycopy(src.array(), tStartOffset, array, tupleDataEndOffset, length);
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
                     tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
         int tStartOffset = tupleAccessor.getTupleStartOffset(tIndex);
         int tEndOffset = tupleAccessor.getTupleEndOffset(tIndex);
         return append(tupleAccessor, tStartOffset, tEndOffset);
     }
 
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1,
-            int tIndex1) throws HyracksDataException {
+    @Override
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
@@ -143,22 +147,22 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1,
             int offset1, int dataLen1) throws HyracksDataException {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
@@ -176,21 +180,19 @@
             System.arraycopy(src0.array(), startOffset0, array, tupleDataEndOffset, slotsLen0);
             // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
             for (int i = 0; i < fieldSlots1.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4,
-                        (fieldSlots1[i] + dataLen0));
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
-            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1, dataLen0);
+            System.arraycopy(src0.array(), startOffset0 + slotsLen0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
+                    dataLen0);
             // Copy bytes1
-            System.arraycopy(bytes1, offset1, array,
-                    tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0, dataLen1);
+            System.arraycopy(bytes1, offset1, array, tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4 + dataLen0,
+                    dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
@@ -219,22 +221,21 @@
                         src1.getInt(startOffset1 + i * 4) + dataLen0);
             }
             // Copy bytes0
-            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1,
-                    dataLen0);
+            System.arraycopy(bytes0, offset0, array, tupleDataEndOffset + slotsLen0 + slotsLen1, dataLen0);
             // Copy data1
-            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array, tupleDataEndOffset + slotsLen0
-                    + slotsLen1 + dataLen0, dataLen1);
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, array,
+                    tupleDataEndOffset + slotsLen0 + slotsLen1 + dataLen0, dataLen1);
             tupleDataEndOffset += (length0 + length1);
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
     }
 
+    @Override
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields)
             throws HyracksDataException {
         int fTargetSlotsLength = fields.length * 4;
@@ -253,18 +254,17 @@
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
-                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset
-                        + fTargetSlotsLength + fStartOffset, fLen);
+                System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
+                        tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fEndOffset);
                 fStartOffset = fEndOffset;
             }
             tupleDataEndOffset += length;
-            IntSerDeUtils.putInt(array,
-                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1),
+                    tupleDataEndOffset);
             ++tupleCount;
-            IntSerDeUtils
-                    .putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+            IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
             return true;
         }
         return false;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
new file mode 100644
index 0000000..7100c11
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
+
+public class MessagingFrameTupleAppender extends FrameTupleAppender {
+
+    public static final int MAX_MESSAGE_SIZE = 100;
+    private final IHyracksTaskContext ctx;
+
+    public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
+        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+            return true;
+        }
+        if (tupleCount == 0) {
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
+                    frame.getMinSize()));
+            reset(frame.getBuffer(), true);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (tupleCount > 0) {
+            appendMessage();
+            getBuffer().clear();
+            outWriter.nextFrame(getBuffer());
+            if (clearFrame) {
+                frame.reset();
+                reset(getBuffer(), true);
+            }
+        }
+    }
+
+    public void appendMessage() {
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
+        tupleDataEndOffset += message.limit();
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
index b808ac1..d2100bb 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/IntSerDeUtils.java
@@ -26,7 +26,14 @@
                 + ((bytes[offset + 3] & 0xff) << 0);
     }
 
+    /**
+     * put integer value into the array bytes at the offset offset
+     * @param bytes byte array to put data in
+     * @param offset offset from the beginning of the array to write the {@code value} in
+     * @param value value to write to {@code bytes[offset]}
+     */
     public static void putInt(byte[] bytes, int offset, int value) {
+
         bytes[offset++] = (byte) (value >> 24);
         bytes[offset++] = (byte) (value >> 16);
         bytes[offset++] = (byte) (value >> 8);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
index cf4808f..7d97507 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractMToNConnectorDescriptor.java
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public boolean allProducersToAllConsumers(){
+    public boolean allProducersToAllConsumers() {
         return true;
     }
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 7856d6a..44d77ac 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -41,8 +41,8 @@
 
     private ITuplePartitionComputerFactory tpcf;
 
-    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf,
-            ILocalityMap localityMap) {
+    public LocalityAwareMToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf, ILocalityMap localityMap) {
         super(spec);
         this.localityMap = localityMap;
         this.tpcf = tpcf;
@@ -60,7 +60,7 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
                 nConsumerPartitions, localityMap, index);
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 22a4c1c..d5e4e20 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -35,7 +35,7 @@
 
 public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    private ITuplePartitionComputerFactory tpcf;
+    protected ITuplePartitionComputerFactory tpcf;
 
     public MToNPartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) {
         super(spec);
@@ -45,15 +45,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
-        return hashWriter;
+                    throws HyracksDataException {
+        return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -61,4 +59,8 @@
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
+
+    public ITuplePartitionComputerFactory getTuplePartitionComputerFactory() {
+        return tpcf;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
new file mode 100644
index 0000000..e90d8b0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public class MToNPartitioningWithMessageConnectorDescriptor extends MToNPartitioningConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ITuplePartitionComputerFactory tpcf) {
+        super(spec, tpcf);
+    }
+
+    @Override
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+                    throws HyracksDataException {
+        return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index cfa0cf9..dde29c1 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -48,13 +48,13 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(index);
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
@@ -69,8 +69,8 @@
         OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
         OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
 
-        constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
-                new PartitionCountExpression(producer)));
+        constraintAcceptor.addConstraint(
+                new Constraint(new PartitionCountExpression(consumer), new PartitionCountExpression(producer)));
     }
 
     @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 646883f..f84c3e4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -51,7 +51,7 @@
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
-                appenders[i] = new FrameTupleAppender();
+                appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
@@ -61,6 +61,10 @@
         this.ctx = ctx;
     }
 
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new FrameTupleAppender();
+    }
+
     @Override
     public void close() throws HyracksDataException {
         HyracksDataException closeException = null;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
new file mode 100644
index 0000000..4055fb0
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionWithMessageDataWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.connectors;
+
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+
+public class PartitionWithMessageDataWriter extends PartitionDataWriter {
+
+    public PartitionWithMessageDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc)
+                    throws HyracksDataException {
+        super(ctx, consumerPartitionCount, pwFactory, recordDescriptor, tpc);
+    }
+
+    @Override
+    protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
+        return new MessagingFrameTupleAppender(ctx);
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index 0960927..d121ec4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
index daf5104..f495b75 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -20,9 +20,9 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITupleParserFactory extends Serializable {
-    public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException;
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
index eb2714f..4558cf9 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/org/apache/hyracks/examples/text/WordTupleParserFactory.java
@@ -27,7 +27,7 @@
 
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -41,7 +41,7 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleParser createTupleParser(final IHyracksCommonContext ctx) {
+    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
         return new ITupleParser() {
             @Override
             public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 6d954eb..9848ffb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -137,4 +137,13 @@
     public ExecutorService getExecutorService() {
         return null;
     }
+
+    @Override
+    public Object getSharedObject() {
+        return null;
+    }
+
+    @Override
+    public void setSharedObject(Object sharedObject) {
+    }
 }
\ No newline at end of file