Support Sending Messages Alongside Frame Data

This change supports sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
Reviewed-on: https://asterix-gerrit.ics.uci.edu/605
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 7da4db7..b72018a 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -54,6 +54,8 @@
     protected final long[] longHashes;
     protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
+    protected final IHyracksTaskContext ctx;
+
     protected ITransactionContext transactionContext;
     protected FrameTupleAccessor frameTupleAccessor;
 
@@ -61,6 +63,7 @@
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction) {
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 27f4fcb..afd8920 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -21,7 +21,7 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -38,7 +38,7 @@
 
     public void configure(Map<String, String> configuration) throws HyracksDataException;
 
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
 
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 116ec09..5deaef0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -69,7 +69,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         appender.reset(frame, true);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index b46a338..34a0207 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -18,14 +18,16 @@
  */
 package org.apache.asterix.external.dataflow;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.external.util.FeedMessageUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -43,10 +45,15 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
         this.appender = new FrameTupleAppender(frame);
+        // Set null feed message
+        ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+        // a null message
+        message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
+        message.flip();
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 36d41b4..eefc8c2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index 99cc3d1..186ca80 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 967dc3e..8249fa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Represents the feed runtime that collects feed tuples from another feed.
@@ -40,14 +41,16 @@
     private final ISubscribableRuntime sourceRuntime;       // Runtime that provides the data
     private final Map<String, String> feedPolicy;           // Policy associated with the feed
     private FeedFrameCollector frameCollector;              // Collector that can be plugged into a frame distributor
+    private final IHyracksTaskContext ctx;
 
     public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId runtimeId,
             FeedRuntimeInputHandler inputSideHandler, IFrameWriter outputSideWriter, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy) {
+            Map<String, String> feedPolicy, IHyracksTaskContext ctx) {
         super(runtimeId, inputSideHandler, outputSideWriter);
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
+        this.ctx = ctx;
     }
 
     public State waitTillCollectionOver() throws InterruptedException {
@@ -93,4 +96,7 @@
         return frameCollector;
     }
 
+    public IHyracksTaskContext getCtx() {
+        return ctx;
+    }
 }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index fd6fcb3..34cb575 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
+import java.nio.ByteBuffer;
 import java.util.logging.Level;
 
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
@@ -26,16 +27,20 @@
 import org.apache.asterix.external.feed.dataflow.FrameDistributor;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
     private final IAdapterRuntimeManager adapterRuntimeManager;
+    private final IHyracksTaskContext ctx;
 
     public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, DistributeFeedFrameWriter feedWriter,
-            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager) {
+            RecordDescriptor recordDesc, IAdapterRuntimeManager adaptorRuntimeManager, IHyracksTaskContext ctx) {
         super(feedId, runtimeId, null, feedWriter, recordDesc);
         this.adapterRuntimeManager = adaptorRuntimeManager;
+        this.ctx = ctx;
     }
 
     @Override
@@ -45,12 +50,14 @@
         collectionRuntime.setFrameCollector(reader);
 
         if (dWriter.getDistributionMode().equals(FrameDistributor.DistributionMode.SINGLE)) {
+            ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
             adapterRuntimeManager.start();
         }
         subscribers.add(collectionRuntime);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Subscribed feed collection [" + collectionRuntime + "] to " + this);
         }
+        collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 8916af6..7901f03 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -146,7 +146,7 @@
                 policyAccessor, false, tupleAccessor, recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy);
+                sourceRuntime, feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
     }
@@ -180,7 +180,7 @@
                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager, nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy);
+                feedPolicy, ctx);
         feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, collectRuntime);
         recordDesc = sourceRuntime.getRecordDescriptor();
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index b31f2bf..9398fa1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -28,11 +28,12 @@
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -40,7 +41,6 @@
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
 import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -103,7 +103,7 @@
                 SubscribableFeedRuntimeId runtimeId = new SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
                         partition);
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
-                        adapterRuntimeManager);
+                        adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 feedFrameWriter.open();
             } else {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 3c4c9ad..9929358 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,12 +33,15 @@
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -88,6 +91,8 @@
 
     private FeedRuntimeInputHandler inputSideHandler;
 
+    private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
             Map<String, String> feedPolicyProperties, String operationId) throws HyracksDataException {
@@ -101,6 +106,7 @@
         this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.operandId = operationId;
+        ctx.setSharedObject(message);
     }
 
     @Override
@@ -116,7 +122,7 @@
 
             coreOperator.open();
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.WARNING, "Failed to open feed store operator", e);
             throw new HyracksDataException(e);
         }
     }
@@ -161,6 +167,7 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
+            processFeedMessage(buffer);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
@@ -168,6 +175,18 @@
         }
     }
 
+    private void processFeedMessage(ByteBuffer buffer) {
+        // read the message and reduce the number of tuples
+        fta.reset(buffer);
+        int tc = fta.getTupleCount() - 1;
+        int offset = fta.getTupleStartOffset(tc);
+        int len = fta.getTupleLength(tc);
+        message.clear();
+        message.put(buffer.array(), offset, len);
+        message.flip();
+        IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.WARNING)) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
new file mode 100644
index 0000000..4175ce1
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedMessageUtils {
+    public enum MessageType {
+        NULL,
+        ACK_REQUEST
+    }
+
+    public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal();
+    public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal();;
+}
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5346bf2..6921392 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -31,7 +31,6 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -64,7 +63,7 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ITupleParser createTupleParser(final IHyracksCommonContext ctx) throws HyracksDataException {
+            public ITupleParser createTupleParser(final IHyracksTaskContext ctx) throws HyracksDataException {
                 ADMDataParser parser;
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index c69cd16..7ef51cb 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -84,6 +84,7 @@
 import org.apache.hyracks.dataflow.common.data.partition.RandomPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 
 /**
  * A utility class for providing helper functions for feeds
@@ -212,6 +213,11 @@
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
         for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
             IConnectorDescriptor connDesc = entry.getValue();
+            if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
+                MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
+                connDesc = new MToNPartitioningWithMessageConnectorDescriptor(altered,
+                        m2nConn.getTuplePartitionComputerFactory());
+            }
             ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
             connectorMapping.put(entry.getKey(), newConnId);
         }