cloning of feed job spec using wrapping operator
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 08436f1..eb148d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -1514,7 +1514,8 @@
             metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
             metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
-            FeedUtil.alterJobSpecificationForFeed(compiled);
+            //FeedUtil.alterJobSpecificationForFeed(compiled);
+            JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed2(compiled);
 
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Altered feed ingestion spec to wrap operators");
@@ -1529,7 +1530,7 @@
                 releaseReadLatch();
                 readLatchAcquired = false;
             }
-            runJob(hcc, compiled, waitForCompletion);
+            runJob(hcc, newJobSpec, waitForCompletion);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index c809295..8c42aad 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -47,61 +47,87 @@
         JobSpecification altered = null;
         altered = new JobSpecification();
         Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-        Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
-        Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
-        Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
 
-        List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
-        Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
+        // copy operators
+        Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+        for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
+            IOperatorDescriptor opDesc = entry.getValue();
+            FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, opDesc);
+            oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+        }
 
-        while (opIt.hasNext()) {
-            OperatorDescriptorId opId = opIt.next();
-            IOperatorDescriptor op = operatorMap.get(opId);
-            if (op instanceof FeedIntakeOperatorDescriptor) {
-                opIdToOp.put(opId, op);
-                opToOp.put(op, op);
-                opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
-                operatorMap.put(opId, op);
-            } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
-                AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
-                IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
-                boolean added = false;
-                for (IPushRuntimeFactory rf : runtimeFactories) {
-                    if (rf instanceof CommitRuntimeFactory) {
-                        opIdToOp.put(opId, op);
-                        opToOp.put(op, op);
-                        opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
-                        operatorMap.put(opId, op);
-                        added = true;
+        // copy connectors
+        for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
+            IConnectorDescriptor connDesc = entry.getValue();
+            altered.getConnectorMap().put(connDesc.getConnectorId(), connDesc);
+        }
+
+        // make connections between operators
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
+                .getConnectorOperatorMap().entrySet()) {
+            IConnectorDescriptor connDesc = altered.getConnectorMap().get(entry.getKey());
+            Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
+            Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
+
+            IOperatorDescriptor leftOpDesc = altered.getOperatorMap().get(
+                    oldNewOID.get(leftOp.getLeft().getOperatorId()));
+            IOperatorDescriptor rightOpDesc = altered.getOperatorMap().get(
+                    oldNewOID.get(rightOp.getLeft().getOperatorId()));
+
+            altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
+        }
+
+        // prepare for setting partition constraints
+        Map<OperatorDescriptorId, List<String>> operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+        Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
+
+        for (Constraint constraint : spec.getUserConstraints()) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            OperatorDescriptorId opId;
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (operatorCounts.get(opId) == null) {
+                        operatorCounts.put(opId, 1);
+                    } else {
+                        operatorCounts.put(opId, operatorCounts.get(opId) + 1);
                     }
-                }
-                if (!added) {
-                    opToReplace.add(op);
-                    IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
-                }
-            } else {
-                opToReplace.add(op);
-                IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
+                    List<String> locations = operatorLocations.get(opDesc.getOperatorId());
+                    if (locations == null) {
+                        locations = new ArrayList<String>();
+                        operatorLocations.put(opDesc.getOperatorId(), locations);
+                    }
+                    String location = (String) ((ConstantExpression) cexpr).getValue();
+                    locations.add(location);
+                    break;
             }
         }
 
-        // operators that were not changed
-        for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
-            if (opIdToOp.get(opId) != null) {
-                operatorMap.put(opId, opIdToOp.get(opId));
+        // set absolute location constraints
+        for (Entry<OperatorDescriptorId, List<String>> entry : operatorLocations.entrySet()) {
+            IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc,
+                    entry.getValue().toArray(new String[] {}));
+        }
+
+        // set count constraints
+        for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
+            IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+            if (!operatorLocations.keySet().contains(entry.getKey())) {
+                PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
             }
         }
 
-        for (IOperatorDescriptor op : opToReplace) {
-            spec.getOperatorMap().remove(op.getOperatorId());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("New Job Spec:" + altered);
         }
 
-        for (IOperatorDescriptor op : opToReplace) {
-            IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
-            opIdToOp.put(op.getOperatorId(), newOp);
-            opToOp.put(op, newOp);
-            opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
-        }
+        return altered;
 
     }
 
@@ -160,7 +186,8 @@
         }
 
         // connectors
-        
+
+        /*
         for(Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()){
             ConnectorDescriptorId cid= entry.getKey();
             IConnectorDescriptor cdesc = entry.getValue();
@@ -168,10 +195,8 @@
                 ((OneToOneConnectorDescriptor)cdesc).
             }
          }
-        
-        
-        
-        
+         */
+
         // connector operator Map
         for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
             Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
@@ -201,7 +226,8 @@
         for (OperatorDescriptorId opId : keysForRemoval) {
             spec.getOperatorOutputMap().remove(opId);
         }
-        
+
+        /*
         for(OperatorDescriptorId opId : keysForAddition.keySet()){
             List<IConnectorDescriptor> origConnectors = keysForAddition.get(opId);
             List<IConnectorDescriptor> newConnectors = new ArrayList<IConnectorDescriptor>();
@@ -209,10 +235,8 @@
                 newConnectors.add(e)
             }
                      
-        }
-        
-        
-       
+        }*/
+
         spec.getOperatorOutputMap().putAll(keysForAddition);
 
         // operator input Map