diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 7da4db7..b72018a 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -54,6 +54,8 @@
     protected final long[] longHashes;
     protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
+    protected final IHyracksTaskContext ctx;
+
     protected ITransactionContext transactionContext;
     protected FrameTupleAccessor frameTupleAccessor;
 
@@ -61,6 +63,7 @@
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 27f4fcb..afd8920 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -21,7 +21,7 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -38,7 +38,7 @@
 
     public void configure(Map<String, String> configuration) throws HyracksDataException;
 
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 116ec09..5deaef0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -69,7 +69,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         appender.reset(frame, true);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index b46a338..34a0207 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -18,14 +18,16 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.FeedMessageUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -43,10 +45,15 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
         this.appender = new FrameTupleAppender(frame);
+        // Set null feed message
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        // a null message
+        message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
+        message.flip();
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 36d41b4..eefc8c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index 99cc3d1..186ca80 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 967dc3e..8249fa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Represents the feed runtime that collects feed tuples from another feed.
@@ -40,14 +41,16 @@
     private final ISubscribableRuntime sourceRuntime;       // Runtime that provides the data
     private final Map<String, String> feedPolicy;           // Policy associated with the feed
     private FeedFrameCollector frameCollector;              // Collector that can be plugged into a frame distributor
+    private final IHyracksTaskContext ctx;
 
     public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
             FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy) {
+            Map<String, String> feedPolicy, IHyracksTaskContext ctx) {
         super(runtimeId, inputSideHandler, outputSideWriter);
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
+        this.ctx = ctx;
     }
 
     public State waitTillCollectionOver() throws InterruptedException {
@@ -93,4 +96,7 @@
         return frameCollector;
     }
 
+    public IHyracksTaskContext getCtx() {
+        return ctx;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index fd6fcb3..34cb575 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
+import java.nio.ByteBuffer;
 import java.util.logging.Level;
 
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
@@ -26,16 +27,20 @@
 import org.apache.asterix.external.feed.dataflow.FrameDistributor;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
     private final IAdapterRuntimeManager adapterRuntimeManager;
+    private final IHyracksTaskContext ctx;
 
     public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
-            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
         super(feedId, runtimeId, null, feedWriter, recordDesc);
         this.adapterRuntimeManager = adaptorRuntimeManager;
+        this.ctx = ctx;
     }
 
     @Override
@@ -45,12 +50,14 @@
         collectionRuntime.setFrameCollector(reader);
 
         if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
             adapterRuntimeManager.start();
         }
         subscribers.add(collectionRuntime);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
         }
+        collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 8916af6..7901f03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -146,7 +146,7 @@
                 policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy);
+                sourceRuntime, feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
     }
@@ -180,7 +180,7 @@
                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy);
+                feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         recordDesc = sourceRuntime.getRecordDescriptor();
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index b31f2bf..9398fa1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -28,11 +28,12 @@
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -40,7 +41,6 @@
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
 import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -103,7 +103,7 @@
                 SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
                         partition);
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
-                        adapterRuntimeManager);
+                        adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 feedFrameWriter.open();
             } else {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 3c4c9ad..9929358 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,12 +33,15 @@
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -88,6 +91,8 @@
 
     private FeedRuntimeInputHandler inputSideHandler;
 
+    private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
@@ -101,6 +106,7 @@
         this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.operandId = operationId;
+        ctx.setSharedObject(message);
     }
 
     @Override
@@ -116,7 +122,7 @@
 
             coreOperator.open();
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.WARNING, "Failed to open feed store operator", e);
             throw new HyracksDataException(e);
         }
     }
@@ -161,6 +167,7 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
+            processFeedMessage(buffer);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
@@ -168,6 +175,18 @@
         }
     }
 
+    private void processFeedMessage(ByteBuffer buffer) {
+        // read the message and reduce the number of tuples
+        fta.reset(buffer);
+        int tc = fta.getTupleCount() - 1;
+        int offset = fta.getTupleStartOffset(tc);
+        int len = fta.getTupleLength(tc);
+        message.clear();
+        message.put(buffer.array(), offset, len);
+        message.flip();
+        IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.WARNING)) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
new file mode 100644
index 0000000..4175ce1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedMessageUtils {
+    public enum MessageType {
+        NULL,
+        ACK_REQUEST
+    }
+
+    public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal();
+    public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal();;
+}
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5346bf2..6921392 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -31,7 +31,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -64,7 +63,7 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ITupleParser createTupleParser(final IHyracksCommonContext ctx) throws HyracksDataException {
+            public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
                 ADMDataParser parser;
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index c69cd16..7ef51cb 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -84,6 +84,7 @@
 import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 
 /**
  * A utility class for providing helper functions for feeds
@@ -212,6 +213,11 @@
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
         for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
             IConnectorDescriptor connDesc = entry.getValue();
+            if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
+                MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
+                connDesc = new MToNPartitioningWithMessageConnectorDescriptor(altered,
+                        m2nConn.getTuplePartitionComputerFactory());
+            }
             ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
             connectorMapping.put(entry.getKey(), newConnId);
         }
