checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 41cda8a..08436f1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -72,6 +72,7 @@
 import edu.uci.ics.asterix.file.DatasetOperations;
 import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.file.FeedUtil;
 import edu.uci.ics.asterix.file.IndexOperations;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
@@ -95,7 +96,6 @@
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -1516,6 +1516,10 @@
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
             FeedUtil.alterJobSpecificationForFeed(compiled);
 
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Altered feed ingestion spec to wrap operators");
+            }
+
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
new file mode 100644
index 0000000..c0d6005
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -0,0 +1,193 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.asterix.algebra.operators.physical.CommitRuntimeFactory;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedUtil {
+
+    private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
+
+    public static boolean isFeedActive(FeedActivity feedActivity) {
+        return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_END) || feedActivity
+                .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
+    }
+
+    public static void alterJobSpecificationForFeed(JobSpecification spec) {
+
+        Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
+        Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
+        Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
+        Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+
+        Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
+        IOperatorDescriptor ingestOp = null;
+        OperatorDescriptorId ingestOpId = null;
+        List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
+        while (opIt.hasNext()) {
+            OperatorDescriptorId opId = opIt.next();
+            IOperatorDescriptor op = operatorMap.get(opId);
+            if (op instanceof FeedIntakeOperatorDescriptor) {
+                opIdToOp.put(opId, op);
+                opToOp.put(op, op);
+                opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+            } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
+                AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
+                IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
+                boolean added = false;
+                for (IPushRuntimeFactory rf : runtimeFactories) {
+                    if (rf instanceof CommitRuntimeFactory) {
+                        opIdToOp.put(opId, op);
+                        opToOp.put(op, op);
+                        opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+                        added = true;
+                    }
+                }
+                if (!added) {
+                    opToReplace.add(op);
+                }
+            } else {
+                opToReplace.add(op);
+            }
+        }
+
+        // operator map
+        for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
+            if (opIdToOp.get(opId) != null) {
+                operatorMap.put(opId, opIdToOp.get(opId));
+            }
+        }
+
+        for (IOperatorDescriptor op : opToReplace) {
+            spec.getOperatorMap().remove(op.getOperatorId());
+        }
+
+        for (IOperatorDescriptor op : opToReplace) {
+            IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
+            opIdToOp.put(op.getOperatorId(), newOp);
+            opToOp.put(op, newOp);
+            opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
+        }
+
+        // connector operator Map
+        for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
+                    .getConnectorOperatorMap().get(cid);
+
+            Pair<IOperatorDescriptor, Integer> leftPair = p.getLeft();
+            Pair<IOperatorDescriptor, Integer> newLeftPair = Pair.of(opToOp.get(leftPair.getLeft()),
+                    leftPair.getRight());
+
+            Pair<IOperatorDescriptor, Integer> newRightPair = Pair.of(opToOp.get(p.getRight().getLeft()), p.getRight()
+                    .getRight());
+
+            spec.getConnectorOperatorMap().put(cid, Pair.of(newLeftPair, newRightPair));
+        }
+
+        // operator Output Map
+        Set<OperatorDescriptorId> keysForRemoval = new HashSet<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, List<IConnectorDescriptor>> keysForAddition = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+        for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorOutputMap().entrySet()) {
+            OperatorDescriptorId opId = entry.getKey();
+            if (!opIdToOpId.get(opId).equals(opId)) {
+                keysForRemoval.add(opId);
+                keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
+            }
+        }
+
+        for (OperatorDescriptorId opId : keysForRemoval) {
+            spec.getOperatorOutputMap().remove(opId);
+        }
+        spec.getOperatorOutputMap().putAll(keysForAddition);
+
+        // operator input Map
+        keysForRemoval.clear();
+        keysForAddition.clear();
+        for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorInputMap().entrySet()) {
+            OperatorDescriptorId opId = entry.getKey();
+            if (!opIdToOpId.get(opId).equals(opId)) {
+                keysForRemoval.add(opId);
+                keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
+            }
+        }
+
+        for (OperatorDescriptorId opId : keysForRemoval) {
+            spec.getOperatorInputMap().remove(opId);
+        }
+        spec.getOperatorInputMap().putAll(keysForAddition);
+
+        Set<Constraint> userConstraints = spec.getUserConstraints();
+        Set<Constraint> constraintsForRemoval = new HashSet<Constraint>();
+        Map<OperatorDescriptorId, List<String>> constraintsForAddition = new HashMap<OperatorDescriptorId, List<String>>();
+
+        OperatorDescriptorId opId;
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (!opIdToOpId.get(opId).equals(opId)) {
+                        constraintsForRemoval.add(constraint);
+                    }
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    if (!opIdToOpId.get(opId).equals(opId)) {
+                        constraintsForRemoval.add(constraint);
+                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                        List<String> locations = constraintsForAddition.get(opId);
+                        if (locations == null) {
+                            locations = new ArrayList<String>();
+                            constraintsForAddition.put(opId, locations);
+                        }
+                        locations.add(oldLocation);
+                    }
+                    break;
+            }
+        }
+
+        spec.getUserConstraints().removeAll(constraintsForRemoval);
+        for (Entry<OperatorDescriptorId, List<String>> entry : constraintsForAddition.entrySet()) {
+            OperatorDescriptorId oldOpId = entry.getKey();
+            OperatorDescriptorId newOpId = opIdToOpId.get(oldOpId);
+            if (!newOpId.equals(oldOpId)) {
+                PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, opIdToOp.get(oldOpId), entry.getValue()
+                        .toArray(new String[] {}));
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Modified job spec with wrapped operators\n" + spec);
+        }
+    }
+}
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 21caf28..4d27e48 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -131,6 +131,8 @@
 
     @Test
     public void test() throws Exception {
-        TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+        if (tcCtx.getTestCase().getCompilationUnit().get(0).getName().contains("feeds")) {
+            TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+        }
     }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index b726955..fae2fe4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -44,4 +44,6 @@
         return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
     }
 
+   
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 1f3c43a..4ad7617 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -27,6 +27,11 @@
         return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator);
     }
 
+    @Override
+    public String toString() {
+        return coreOperator.getDisplayName();
+    }
+
     private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
         private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
@@ -39,6 +44,8 @@
 
         @Override
         public void open() throws HyracksDataException {
+            //  coreOperatorNodePushable.setOutputFrameWriter(partition, coreOperatorNodePushable, recordDescProvider);
+            coreOperatorNodePushable.setOutputFrameWriter(0, writer, recordDesc);
             coreOperatorNodePushable.open();
         }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
deleted file mode 100644
index ed4c532..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class FeedUtil {
-
-    public static boolean isFeedActive(FeedActivity feedActivity) {
-        return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_END) || feedActivity
-                .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
-    }
-
-    public static void alterJobSpecificationForFeed(JobSpecification spec) {
-
-        Map<OperatorDescriptorId, IOperatorDescriptor> r1 = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
-        Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-        Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
-        Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
-
-        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : spec.getOperatorMap().entrySet()) {
-            OperatorDescriptorId opId = entry.getKey();
-            IOperatorDescriptor op = entry.getValue();
-            if (!(op instanceof FeedIntakeOperatorDescriptor)) {
-                IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
-                opIdToOp.put(opId, newOp);
-                opToOp.put(op, newOp);
-            } else {
-                opIdToOp.put(opId, op);
-                opToOp.put(op, op);
-            }
-        }
-
-        for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
-            operatorMap.put(opId, opIdToOp.get(opId));
-        }
-
-        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
-                .getConnectorOperatorMap();
-        Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> r2 = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
-        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
-                .entrySet()) {
-            IOperatorDescriptor opLeft = entry.getValue().getLeft().getLeft();
-            IOperatorDescriptor opRight = entry.getValue().getRight().getLeft();
-
-            if ((!opLeft instanceof FeedIntakeOperatorDescriptor)) {
-
-            }
-
-        }
-
-    }
-}