diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 1a0ecd9..dfb73ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -61,7 +61,7 @@
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedConnection feedConnection = feedDataSource.getFeedConnection();
-        if (feedConnection.getAppliedFunctions() == null) {
+        if (feedConnection.getAppliedFunctions() == null || feedConnection.getAppliedFunctions().size() == 0) {
             return false;
         }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..874cbb1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -80,6 +80,7 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.Constraint;
@@ -207,12 +208,15 @@
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
         List<JobId> jobIds = new ArrayList<>();
+        FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
             FeedConnection curFeedConnection = feedConnections.get(iter1);
             JobSpecification subJob = jobsList.get(iter1);
             operatorIdMapping.clear();
             Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = subJob.getOperatorMap();
+            FeedConnectionId feedConnectionId = new FeedConnectionId(ingestionOp.getEntityId(),
+                    feedConnections.get(iter1).getDatasetName());
 
             FeedPolicyEntity feedPolicyEntity =
                     FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -221,26 +225,36 @@
             for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorsMap.entrySet()) {
                 IOperatorDescriptor opDesc = entry.getValue();
                 OperatorDescriptorId oldId = opDesc.getOperatorId();
-                OperatorDescriptorId opId;
+                OperatorDescriptorId opId = null;
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
                     String operandId = ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                    FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(jobSpec,
-                            new FeedConnectionId(ingestionOp.getEntityId(),
-                                    feedConnections.get(iter1).getDatasetName()),
-                            opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, false, operandId);
+                    metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                            feedConnectionId, opDesc, feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
+                            operandId);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
                     if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                         AlgebricksMetaOperatorDescriptor algOp = (AlgebricksMetaOperatorDescriptor) opDesc;
-                        for (IPushRuntimeFactory runtimeFactory : algOp.getPipeline().getRuntimeFactories()) {
-                            if (runtimeFactory instanceof StreamSelectRuntimeFactory) {
-                                ((StreamSelectRuntimeFactory) runtimeFactory).retainMissing(true, 0);
+                        IPushRuntimeFactory[] runtimeFactories = algOp.getPipeline().getRuntimeFactories();
+                        // Tweak AssignOp to work with messages
+                        if (runtimeFactories[0] instanceof AssignRuntimeFactory && runtimeFactories.length > 1) {
+                            IConnectorDescriptor connectorDesc = subJob.getOperatorInputMap()
+                                    .get(opDesc.getOperatorId()).get(0);
+                            // anything on the network interface needs to be message compatible
+                            if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
+                                metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                                        feedConnectionId, opDesc, feedPolicyEntity.getProperties(),
+                                        FeedRuntimeType.COMPUTE, null);
+                                opId = metaOp.getOperatorId();
+                                opDesc.setOperatorId(opId);
                             }
                         }
                     }
-                    opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    if (opId == null) {
+                        opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    }
                 }
                 operatorIdMapping.put(oldId, opId);
             }
@@ -250,9 +264,6 @@
             for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : subJob.getConnectorMap().entrySet()) {
                 IConnectorDescriptor connDesc = entry.getValue();
                 ConnectorDescriptorId newConnId;
-                if (entry.getKey().getId() == 0) {
-                    continue;
-                }
                 if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                     MToNPartitioningConnectorDescriptor m2nConn = (MToNPartitioningConnectorDescriptor) connDesc;
                     connDesc = new MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +288,8 @@
                 if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) {
                     jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), replicateOp, iter1, leftOpDesc,
                             leftOp.getRight());
-                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), leftOpDesc, leftOp.getRight(),
-                            rightOpDesc, rightOp.getRight());
-                } else {
-                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
                 }
+                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
             }
 
             // prepare for setting partition constraints
@@ -295,16 +303,10 @@
                 switch (lexpr.getTag()) {
                     case PARTITION_COUNT:
                         opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         operatorCounts.put(operatorIdMapping.get(opId), (int) ((ConstantExpression) cexpr).getValue());
                         break;
                     case PARTITION_LOCATION:
                         opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                         List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
                         if (locations == null) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@
     private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId, ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType subscriptionLocation) {
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, FeedRuntimeType subscriptionLocation) {
         super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..cffd303 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -76,7 +76,7 @@
 
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties,
-            final FeedRuntimeType runtimeType, final boolean enableSubscriptionMode, final String operandId) {
+            final FeedRuntimeType runtimeType, final String operandId) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index 2975e63..c31da8b 100644
--- a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++ b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-788
\ No newline at end of file
+804
\ No newline at end of file
