Improve Messaging Connector and Add Unit Tests

Before this change, messaging connector always reserves 100 bytes
for messages which are mostly un-used. With this change, it only
reserves two bytes and sends null messages by default. In case a
new message doesn't fit in the leftover space of a frame, it sends
the frame with a null message, followed by a dedicated frame for
the message.

Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/880
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 99f1c2f..76d05d5 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -286,5 +286,10 @@
       <artifactId>asterix-external-data</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-test-support</artifactId>
+        <version>0.2.18-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
new file mode 100644
index 0000000..e267cc7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class TestTupleGenerator {
+    private final int stringFieldSizes;
+    private final FieldType[] types;
+    private final boolean reuseObject;
+    private final Random random = new Random();
+    private ITupleReference tuple;
+    private UTF8StringSerializerDeserializer stringSerde =
+            new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+    private GrowableArray[] fields;
+
+    public enum FieldType {
+        Integer64,
+        Boolean,
+        Double,
+        String
+    }
+
+    public TestTupleGenerator(FieldType[] types, int stringFieldSizes, boolean resueObject) {
+        this.types = types;
+        this.stringFieldSizes = stringFieldSizes;
+        this.reuseObject = resueObject;
+        this.fields = new GrowableArray[types.length];
+        for (int i = 0; i < types.length; i++) {
+            fields[i] = new GrowableArray();
+        }
+        tuple = new TestTupleReference(fields);
+    }
+
+    public ITupleReference next() throws HyracksDataException {
+        if (reuseObject) {
+            for (int i = 0; i < types.length; i++) {
+                fields[i].reset();
+            }
+        } else {
+            this.fields = new GrowableArray[types.length];
+            for (int i = 0; i < types.length; i++) {
+                fields[i] = new GrowableArray();
+            }
+            tuple = new TestTupleReference(fields);
+        }
+        for (int i = 0; i < types.length; i++) {
+            FieldType type = types[i];
+            switch (type) {
+                case Boolean:
+                    Boolean aBoolean = random.nextBoolean();
+                    BooleanSerializerDeserializer.INSTANCE.serialize(aBoolean, fields[i].getDataOutput());
+                    break;
+                case Double:
+                    double aDouble = random.nextDouble();
+                    DoubleSerializerDeserializer.INSTANCE.serialize(aDouble, fields[i].getDataOutput());
+                    break;
+                case Integer64:
+                    long aLong = random.nextLong();
+                    Integer64SerializerDeserializer.INSTANCE.serialize(aLong, fields[i].getDataOutput());
+                    break;
+                case String:
+                    String aString = RandomStringUtils.randomAlphanumeric(stringFieldSizes);
+                    stringSerde.serialize(aString, fields[i].getDataOutput());
+                    break;
+                default:
+                    break;
+            }
+        }
+        return tuple;
+    }
+
+    private class TestTupleReference implements ITupleReference {
+        private final GrowableArray[] fields;
+
+        private TestTupleReference(GrowableArray[] fields) {
+            this.fields = fields;
+        }
+
+        @Override
+        public int getFieldCount() {
+            return fields.length;
+        }
+
+        @Override
+        public byte[] getFieldData(int fIdx) {
+
+            return fields[fIdx].getByteArray();
+        }
+
+        @Override
+        public int getFieldStart(int fIdx) {
+            return 0;
+        }
+
+        @Override
+        public int getFieldLength(int fIdx) {
+            return fields[fIdx].getLength();
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
new file mode 100644
index 0000000..2f712cc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.test.common.TestTupleGenerator;
+import org.apache.asterix.test.common.TestTupleGenerator.FieldType;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConnectorDescriptorWithMessagingTest {
+
+    // Tuples used in this test are made of Fields of {Integer,Double,Boolean,UTF8String};
+    private static final int NUMBER_OF_CONSUMERS = 5;
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int CURRENT_PRODUCER = 0;
+    private static final int STRING_FIELD_SIZES = 32;
+
+    @Test
+    public void testEmptyFrames() throws Exception {
+        try {
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+                recipients.add((TestFrameWriter) writer);
+            }
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 1);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 2);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 3);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 4);
+                Assert.assertEquals(writer.closeCount(), 1);
+            }
+
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Test
+    public void testMessageLargerThanEmptyFrame() throws Exception {
+        try {
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+            ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) {
+                recipients.add((TestFrameWriter) writer);
+            }
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 1);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 2);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 3);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 4);
+                Assert.assertEquals(writer.closeCount(), 1);
+            }
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    private void writeRandomMessage(VSizeFrame frame, byte tag, int size) throws HyracksDataException {
+        // We subtract 2, 1 for the tag, and one for the end offset
+        Random random = new Random();
+        byte[] bytes = new byte[size - 2];
+        random.nextBytes(bytes);
+        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(1, size - 1, DEFAULT_FRAME_SIZE);
+        frame.ensureFrameSize(frameSize);
+        frame.getBuffer().clear();
+        frame.getBuffer().put(tag);
+        frame.getBuffer().put(bytes);
+        frame.getBuffer().flip();
+    }
+
+    @Test
+    public void testMessageLargerThanSome() throws Exception {
+        try {
+            // Routing will be to 1, 3, and 4 only. 0 and 2 will receive no tuples
+            List<Integer> routing = Arrays.asList(1, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+            ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+            FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) {
+                recipients.add(partitionWriterFactory.getWriters().get(i));
+            }
+            TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender appender = new FrameTupleAppender(frame);
+            ITupleReference tuple = ttg.next();
+            while (appender.append(tuple)) {
+                tuple = ttg.next();
+            }
+            partitioner.nextFrame(frame.getBuffer());
+            partitioner.flush();
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 2);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 2);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 2);
+            for (TestFrameWriter writer : recipients) {
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Test
+    public void testMessageFitsWithTuples() throws Exception {
+        try {
+            // Routing will be round robin
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
+            FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) {
+                recipients.add(partitionWriterFactory.getWriters().get(i));
+            }
+            TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender appender = new FrameTupleAppender(frame);
+            for (int count = 0; count < NUMBER_OF_CONSUMERS; count++) {
+                ITupleReference tuple = ttg.next();
+                appender.append(tuple);
+            }
+            partitioner.nextFrame(frame.getBuffer());
+            partitioner.flush();
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 1);
+            Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 1);
+            for (TestFrameWriter writer : recipients) {
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 2);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
+                Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
new file mode 100644
index 0000000..385f6a2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestPartitionComputerFactory implements ITuplePartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final List<Integer> destinations;
+
+    /*
+     * For test purposes, this partition computer produces partitions according to a passed in
+     * list of integers.
+     */
+    public TestPartitionComputerFactory(List<Integer> destinations) {
+        this.destinations = destinations;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final List<Integer> destinations =
+                    new ArrayList<Integer>(TestPartitionComputerFactory.this.destinations);
+            private Iterator<Integer> iterator = destinations.iterator();
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                if (destinations.size() == 0) {
+                    return 0;
+                }
+                while (!iterator.hasNext()) {
+                    iterator = destinations.iterator();
+                }
+                return iterator.next();
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
new file mode 100644
index 0000000..4b4c722
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestFrameWriter;
+
+/*
+ * A partition writer factory that is used for testing partitioners
+ */
+public class TestPartitionWriterFactory implements IPartitionWriterFactory {
+    private HashMap<Integer, TestFrameWriter> writers = new HashMap<>();
+
+    @Override
+    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+        // The created writers must retain a deep copy of the input frame
+        writers.put(receiverIndex, FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), true));
+        return writers.get(receiverIndex);
+    }
+
+    public HashMap<Integer, TestFrameWriter> getWriters() {
+        return writers;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
similarity index 70%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
index 68783ca..536bf3a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.util;
+package org.apache.asterix.test.dataflow;
 
-public class FeedMessageUtils {
-    public enum MessageType {
-        NULL,
-        ACK_REQUEST
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class TestRecordDescriptorFactory {
+    public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
+        return null;
     }
-
-    public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal();
-    public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index b09bef9..87daffa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -67,18 +67,7 @@
                     continue;
                 }
                 tb.reset();
-                try {
-                    dataParser.parse(record, tb.getDataOutput());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
-                    feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
-                    continue;
-                }
-                tb.addFieldEndOffset();
-                addMetaPart(tb, record);
-                addPrimaryKeys(tb, record);
-                tupleForwarder.addTuple(tb);
+                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
@@ -107,6 +96,23 @@
         }
     }
 
+    private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
+        synchronized (dataParser) {
+            try {
+                dataParser.parse(record, tb.getDataOutput());
+            } catch (Exception e) {
+                LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+                feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
+                // continue the outer loop
+                return;
+            }
+            tb.addFieldEndOffset();
+            addMetaPart(tb, record);
+            addPrimaryKeys(tb, record);
+            tupleForwarder.addTuple(tb);
+        }
+    }
+
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index ed5e355..0d72682 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,8 @@
     private final AsterixInputStream stream;
 
     public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfFields, IStreamDataParser streamParser,
-            AsterixInputStream inputStream) {
-        super(ctx, tupleForwarder, feedLogManager, numOfFields);
+            FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) {
+        super(ctx, tupleForwarder, feedLogManager, 1);
         this.dataParser = streamParser;
         this.stream = inputStream;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 7ae2f41..f1eb870 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -19,14 +19,12 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import javax.annotation.Nonnull;
 
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedMessageUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,6 +33,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -60,10 +59,10 @@
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
             // Set null feed message
-            ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+            VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
             // a null message
-            message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
-            message.flip();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
             initialized = true;
         }
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 5e8c022..6cdc2af 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,16 +18,15 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
-import java.nio.ByteBuffer;
 import java.util.logging.Level;
 
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
@@ -48,7 +47,7 @@
         dWriter.subscribe(collector);
         subscribers.add(collectionRuntime);
         if (numSubscribers == 0) {
-            ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
+            ctx.setSharedObject(new VSizeFrame(ctx));
             collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
             adapterRuntimeManager.start();
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 716468e..8d8bc28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -34,13 +34,13 @@
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -86,7 +86,7 @@
 
     private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
 
-    private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+    private final VSizeFrame message;
 
     private final FeedMetaOperatorDescriptor opDesc;
 
@@ -111,6 +111,7 @@
         this.connectionId = feedConnectionId;
         this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
+        this.message = new VSizeFrame(ctx);
         ctx.setSharedObject(message);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index b79707b..47df39e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -35,13 +35,13 @@
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -87,7 +87,7 @@
 
     private final String targetId;
 
-    private final ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+    private final VSizeFrame message;
 
     private final IRecordDescriptorProvider recordDescProvider;
 
@@ -106,6 +106,7 @@
         this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
+        this.message = new VSizeFrame(ctx);
         ctx.setSharedObject(message);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 6ba27d8..50ebb71 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -48,7 +48,6 @@
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -73,8 +72,8 @@
                                 DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
                                 recordReader, ((IIndexingDatasource) recordReader).getIndexer());
                     } else if (isFeed) {
-                        FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils
-                                .getTupleForwarder(configuration, feedLogManager);
+                        FeedTupleForwarder tupleForwarder =
+                                (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager);
                         boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
                         boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
                         if (isRecordWithMeta) {
@@ -108,7 +107,7 @@
                     if (isFeed) {
                         return new FeedStreamDataFlowController(ctx,
                                 (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
-                                feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream);
+                                feedLogManager, streamParser, stream);
                     } else {
                         return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
                                 streamParser);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 51e7e72..8228c39 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -32,13 +32,14 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FeedUtils {
     private static String prepareDataverseFeedName(String dataverseName, String feedName) {
@@ -49,8 +50,8 @@
             ClusterPartition partition) {
         File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-        String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                partition.getPartitionId());
+        String storagePartitionPath =
+                StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition.getPartitionId());
         // Note: feed adapter instances in a single node share the feed logger
         // format: 'storage dir name'/partition_#/dataverse/feed/node
         File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
@@ -88,22 +89,21 @@
                 feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile());
     }
 
-    public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) {
+    public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)
+            throws HyracksDataException {
         // read the message and reduce the number of tuples
         fta.reset(input);
         int tc = fta.getTupleCount() - 1;
         int offset = fta.getTupleStartOffset(tc);
         int len = fta.getTupleLength(tc);
-        message.clear();
-        message.put(input.array(), offset, len);
-        message.flip();
+        int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+        message.ensureFrameSize(newSize);
+        message.getBuffer().clear();
+        message.getBuffer().put(input.array(), offset, len);
+        message.getBuffer().flip();
         IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc);
     }
 
-    public static int getNumOfFields(Map<String, String> configuration) {
-        return 1;
-    }
-
     public static String getFeedMetaTypeName(Map<String, String> configuration) {
         return configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME);
 
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index c00db7a..3becd96 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -118,7 +118,8 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -159,7 +160,8 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -207,7 +209,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -317,7 +319,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -390,7 +392,7 @@
             // Spill budget = Memory budget, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -452,7 +454,7 @@
             // Spill budget = Memory budget, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -509,7 +511,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -553,7 +555,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -593,7 +596,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
@@ -632,7 +636,7 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -682,7 +686,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -732,7 +736,7 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index 4bddfa9..c9cc71e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -33,7 +33,7 @@
     }
 
     public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
-            Collection<FrameWriterOperation> errorThrowingOperations) {
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
         CountAnswer openAnswer =
                 createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations);
         CountAnswer nextAnswer =
@@ -44,7 +44,7 @@
                 createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations);
         CountAnswer closeAnswer =
                 createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations);
-        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer);
+        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames);
     }
 
     public static CountAnswer createAnswer(FrameWriterOperation operation,
@@ -59,7 +59,7 @@
         }
     }
 
-    public static TestControlledFrameWriter create(int initialFrameSize) {
-        return new TestControlledFrameWriter(initialFrameSize);
+    public static TestControlledFrameWriter create(int initialFrameSize, boolean deepCopyInputFrames) {
+        return new TestControlledFrameWriter(initialFrameSize, deepCopyInputFrames);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
index bf168c2..b98bcf7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -30,8 +30,9 @@
     private volatile int currentMultiplier = 0;
     private volatile int kicks = 0;
 
-    public TestControlledFrameWriter(int initialFrameSize) {
-        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer());
+    public TestControlledFrameWriter(int initialFrameSize, boolean deepCopyInputFrames) {
+        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(),
+                deepCopyInputFrames);
         this.initialFrameSize = initialFrameSize;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
index b3492fe..065e64d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -29,19 +29,27 @@
     private final CountAnswer flushAnswer;
     private final CountAnswer failAnswer;
     private final CountAnswer closeAnswer;
+    private static final int BYTES32KB = 32768;
     private long openDuration = 0L;
     private long nextDuration = 0L;
     private long flushDuration = 0L;
     private long failDuration = 0L;
     private long closeDuration = 0L;
+    // If copyFrames was set, we take a copy of the frame, otherwise, we simply point lastFrame to it
+    private final boolean deepCopyFrames;
+    private ByteBuffer lastFrame;
 
     public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer,
-            CountAnswer failAnswer, CountAnswer closeAnswer) {
+            CountAnswer failAnswer, CountAnswer closeAnswer, boolean deepCopyFrames) {
         this.openAnswer = openAnswer;
         this.nextAnswer = nextAnswer;
         this.closeAnswer = closeAnswer;
         this.flushAnswer = flushAnswer;
         this.failAnswer = failAnswer;
+        this.deepCopyFrames = deepCopyFrames;
+        if (deepCopyFrames) {
+            lastFrame = ByteBuffer.allocate(BYTES32KB);
+        }
     }
 
     @Override
@@ -56,6 +64,15 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (deepCopyFrames) {
+            if (lastFrame.capacity() != buffer.capacity()) {
+                lastFrame = ByteBuffer.allocate(buffer.capacity());
+            }
+            lastFrame.clear();
+            lastFrame.put(buffer.array());
+        } else {
+            lastFrame = buffer;
+        }
         delay(nextDuration);
         nextAnswer.call();
     }
@@ -170,4 +187,8 @@
             }
         }
     }
+
+    public ByteBuffer getLastFrame() {
+        return lastFrame;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 1623035..ef11b5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FrameTupleAppender extends AbstractFrameAppender implements IFrameTupleAppender {
@@ -61,6 +62,30 @@
         return false;
     }
 
+    public boolean append(ITupleReference tuple) throws HyracksDataException {
+        int tupleSize = 0;
+        for (int i = 0; i < tuple.getFieldCount(); i++) {
+            tupleSize += tuple.getFieldLength(i);
+        }
+        if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
+            int offset = 0;
+            for (int i = 0; i < tuple.getFieldCount(); ++i) {
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
+                        tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
+                offset += tuple.getFieldLength(i);
+            }
+            tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+            IntSerDeUtils.putInt(getBuffer().array(),
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public boolean append(byte[] bytes, int offset, int length) throws HyracksDataException {
         if (canHoldNewTuple(0, length)) {
@@ -252,8 +277,8 @@
             int fEndOffset = 0;
             for (int i = 0; i < fields.length; ++i) {
                 int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]);
-                int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
-                        - accessor.getFieldStartOffset(tIndex, fields[i]);
+                int fLen =
+                        accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]);
                 System.arraycopy(accessor.getBuffer().array(), fSrcStart, array,
                         tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
index 77495dd..e57e12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -71,19 +71,6 @@
         FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender, fieldSlots, bytes, offset, length);
     }
 
-    public void append(byte[] bytes, int offset, int length) throws HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, bytes, offset, length);
-    }
-
-    public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset)
-            throws HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tStartOffset, tEndOffset);
-    }
-
-    public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tIndex);
-    }
-
     public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
             throws HyracksDataException {
         FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0, accessor1, tIndex1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 345c506..cae659d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -18,32 +18,81 @@
  */
 package org.apache.hyracks.dataflow.common.io;
 
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.util.IntSerDeUtils;
 
+/**
+ * A frame tuple appender that appends messages stored in the task context when pushing frames forward
+ * This appender must only be used on network boundary
+ */
 public class MessagingFrameTupleAppender extends FrameTupleAppender {
 
-    public static final int MAX_MESSAGE_SIZE = 100;
     private final IHyracksTaskContext ctx;
+    private static final int NULL_MESSAGE_SIZE = 1;
+    public static final byte NULL_FEED_MESSAGE = 0x01;
+    public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
+    public static final byte SNAPSHOT_MESSAGE = 0x03;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
 
+    public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException {
+        out.println(getMessageString(message));
+    }
+
+    public static String getMessageString(VSizeFrame message) throws HyracksDataException {
+        StringBuilder aString = new StringBuilder();
+        aString.append("Message Type: ");
+        switch (getMessageType(message)) {
+            case NULL_FEED_MESSAGE:
+                aString.append("Null, ");
+                break;
+            case ACK_REQ_FEED_MESSAGE:
+                aString.append("Ack Request, ");
+                break;
+            case SNAPSHOT_MESSAGE:
+                aString.append("Snapshot, ");
+                break;
+            default:
+                aString.append("Unknown, ");
+                break;
+        }
+        aString.append("Message Length: ");
+        int messageLength = message.getBuffer().remaining();
+        aString.append(messageLength);
+        return aString.toString();
+    }
+
+    public static byte getMessageType(VSizeFrame message) throws HyracksDataException {
+        switch (message.getBuffer().array()[0]) {
+            case NULL_FEED_MESSAGE:
+                return NULL_FEED_MESSAGE;
+            case ACK_REQ_FEED_MESSAGE:
+                return ACK_REQ_FEED_MESSAGE;
+            case SNAPSHOT_MESSAGE:
+                return SNAPSHOT_MESSAGE;
+            default:
+                throw new HyracksDataException("Unknown message type");
+        }
+    }
+
     @Override
     protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
-        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+        if (hasEnoughSpace(fieldCount + 1, dataLength + NULL_MESSAGE_SIZE)) {
             return true;
         }
         if (tupleCount == 0) {
-            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE,
-                    frame.getMinSize()));
+            frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount + 1,
+                    dataLength + NULL_MESSAGE_SIZE, frame.getMinSize()));
             reset(frame.getBuffer(), true);
             return true;
         }
@@ -52,13 +101,32 @@
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
-        appendMessage((ByteBuffer) ctx.getSharedObject());
+        // If message fits, we append it, otherwise, we append a null message, then send a message only
+        // frame with the message
+        ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
+        int messageSize = message.limit() - message.position();
+        if (hasEnoughSpace(1, messageSize)) {
+            appendMessage(message);
+            forward(outWriter);
+        } else {
+            if (tupleCount > 0) {
+                appendNullMessage();
+                forward(outWriter);
+            }
+            if (!hasEnoughSpace(1, messageSize)) {
+                frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+                reset(frame.getBuffer(), true);
+            }
+            appendMessage(message);
+            forward(outWriter);
+        }
+    }
+
+    private void forward(IFrameWriter outWriter) throws HyracksDataException {
         getBuffer().clear();
         outWriter.nextFrame(getBuffer());
-        if (clearFrame) {
-            frame.reset();
-            reset(getBuffer(), true);
-        }
+        frame.reset();
+        reset(getBuffer(), true);
     }
 
     private void appendMessage(ByteBuffer message) {
@@ -69,4 +137,13 @@
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
+
+    private void appendNullMessage() {
+        array[tupleDataEndOffset] = NULL_FEED_MESSAGE;
+        tupleDataEndOffset++;
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
index e90d8b0..f6996f1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -30,6 +30,11 @@
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * This connector enable sending messages alongside data tuples. Messages are sent on flush() calls.
+     * It broadcasts messages to all consumers. If the message doesn't fit in the current frame for a specific
+     * receiver, the current frame is sent and a subsequent one with the message only is sent
+     */
     public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ITuplePartitionComputerFactory tpcf) {
         super(spec, tpcf);
@@ -38,7 +43,7 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
                 tpcf.createPartitioner());
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 98bd860..f2ec64b 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -44,6 +44,7 @@
     private WorkspaceFileFactory fileFactory;
 
     private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
+    private Object sharedObject;
 
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
         this.jobletContext = jobletContext;
@@ -149,10 +150,11 @@
 
     @Override
     public Object getSharedObject() {
-        return null;
+        return sharedObject;
     }
 
     @Override
     public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
     }
 }