modified strategy to choose a substitute node in event of failure
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index a89b6d5..6985019 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -74,7 +74,7 @@
         LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
         String report = null;
         try {
-            report = queue.poll(5, TimeUnit.SECONDS);
+            report = queue.poll(25, TimeUnit.SECONDS);
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 923e841..37c6e8a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -81,6 +81,10 @@
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -258,7 +262,21 @@
             if (registeredFeeds.containsKey(jobId)) {
                 throw new IllegalStateException(" Feed already registered ");
             }
-            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy));
+            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy, jobId));
+        }
+
+        public void deregisterFeed(JobId jobId) {
+            FeedInfo feedInfo = registeredFeeds.remove(jobId);
+            if (feedInfo != null) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("DeRegistered Feed Info :" + feedInfo);
+                }
+            }
+        }
+
+        public void deregisterFeed(FeedInfo feedInfo) {
+            JobId jobId = feedInfo.jobId;
+            deregisterFeed(jobId);
         }
 
         @Override
@@ -280,6 +298,7 @@
                                 LOGGER.info("Job finished for feed id" + feedInfo.feedConnectionId);
                             }
                             handleJobFinishMessage(feedInfo, mesg);
+                            deregisterFeed(mesg.jobId);
                             break;
                     }
                 } catch (InterruptedException e) {
@@ -429,41 +448,90 @@
         private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
             MetadataManager.INSTANCE.acquireWriteLatch();
             MetadataTransactionContext mdTxnCtx = null;
-            try {
-                IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
-                JobInfo info = hcc.getJobInfo(message.jobId);
-                JobStatus status = info.getPendingStatus();
-                List<Exception> exceptions;
-                boolean failure = status != null && status.equals(JobStatus.FAILURE);
-                FeedActivityType activityType = FeedActivityType.FEED_END;
-                Map<String, String> details = new HashMap<String, String>();
-                if (failure) {
-                    exceptions = info.getPendingExceptions();
-                    activityType = FeedActivityType.FEED_FAILURE;
-                    details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
-                }
-                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
-                        feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
-                        activityType, details);
-                MetadataManager.INSTANCE.registerFeedActivity(
-                        mdTxnCtx,
-                        new FeedConnectionId(feedInfo.feedConnectionId.getDataverse(), feedInfo.feedConnectionId
-                                .getFeedName(), feedInfo.feedConnectionId.getDatasetName()), feedActivity);
-                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            } catch (RemoteException | ACIDException | MetadataException e) {
+            boolean feedFailedDueToPostSubmissionNodeLoss = verfyReasonForFailure(feedInfo);
+            if (!feedFailedDueToPostSubmissionNodeLoss) {
                 try {
-                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                } catch (RemoteException | ACIDException ae) {
-                    throw new IllegalStateException(" Unable to abort ");
+                    IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+                    JobInfo info = hcc.getJobInfo(message.jobId);
+                    JobStatus status = info.getPendingStatus();
+                    List<Exception> exceptions;
+                    boolean failure = status != null && status.equals(JobStatus.FAILURE);
+                    FeedActivityType activityType = FeedActivityType.FEED_END;
+                    Map<String, String> details = new HashMap<String, String>();
+                    if (failure) {
+                        exceptions = info.getPendingExceptions();
+                        activityType = FeedActivityType.FEED_FAILURE;
+                        details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
+                    }
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    FeedActivity feedActivity = new FeedActivity(feedInfo.feedConnectionId.getDataverse(),
+                            feedInfo.feedConnectionId.getFeedName(), feedInfo.feedConnectionId.getDatasetName(),
+                            activityType, details);
+                    MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedConnectionId(
+                            feedInfo.feedConnectionId.getDataverse(), feedInfo.feedConnectionId.getFeedName(),
+                            feedInfo.feedConnectionId.getDatasetName()), feedActivity);
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                } catch (RemoteException | ACIDException | MetadataException e) {
+                    try {
+                        MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                    } catch (RemoteException | ACIDException ae) {
+                        throw new IllegalStateException(" Unable to abort ");
+                    }
+                } catch (Exception e) {
+                    // add exception handling here
+                } finally {
+                    MetadataManager.INSTANCE.releaseWriteLatch();
                 }
-            } catch (Exception e) {
-                // add exception handling here
-            } finally {
-                MetadataManager.INSTANCE.releaseWriteLatch();
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Attempt to revive feed");
+                }
+                FeedsActivator activator = new FeedsActivator();
+                String dataverse = feedInfo.feedConnectionId.getDataverse();
+                String datasetName = feedInfo.feedConnectionId.getDatasetName();
+                String feedName = feedInfo.feedConnectionId.getFeedName();
+                String feedPolicy = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+                activator.reviveFeed(dataverse, feedName, datasetName, feedPolicy);
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Revived Feed");
+                }
+             
             }
         }
 
+        private boolean verfyReasonForFailure(FeedInfo feedInfo) {
+            JobSpecification spec = feedInfo.jobSpec;
+            Set<Constraint> userConstraints = spec.getUserConstraints();
+            List<String> locations = new ArrayList<String>();
+            for (Constraint constraint : userConstraints) {
+                LValueConstraintExpression lexpr = constraint.getLValue();
+                ConstraintExpression cexpr = constraint.getRValue();
+                switch (lexpr.getTag()) {
+                    case PARTITION_LOCATION:
+                        String location = (String) ((ConstantExpression) cexpr).getValue();
+                        locations.add(location);
+                        break;
+                }
+            }
+            Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+            List<String> nodesFailedPostSubmission = new ArrayList<String>();
+            for (String location : locations) {
+                if (!participantNodes.contains(location)) {
+                    nodesFailedPostSubmission.add(location);
+                }
+            }
+
+            if (nodesFailedPostSubmission.size() > 0) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Feed failed as nodes failed post submission");
+                }
+                return true;
+            } else {
+                return false;
+            }
+
+        }
+
         public static class FeedMessengerMessage {
             private final IFeedMessage message;
             private final FeedInfo feedInfo;
@@ -522,11 +590,13 @@
         public List<String> storageLocations = new ArrayList<String>();
         public JobInfo jobInfo;
         public Map<String, String> feedPolicy;
+        public JobId jobId;
 
-        public FeedInfo(FeedConnectionId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
+        public FeedInfo(FeedConnectionId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy, JobId jobId) {
             this.feedConnectionId = feedId;
             this.jobSpec = jobSpec;
             this.feedPolicy = feedPolicy;
+            this.jobId = jobId;
         }
 
         @Override
@@ -542,6 +612,11 @@
             return feedConnectionId.hashCode();
         }
 
+        @Override
+        public String toString() {
+            return feedConnectionId + " job id " + jobId;
+        }
+
     }
 
     @Override
@@ -556,6 +631,9 @@
                         failures = new ArrayList<FeedFailure>();
                         failureReport.failures.put(feedInfo, failures);
                     }
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Inestion Node Failure! " + deadNodeId);
+                    }
                     failures.add(new FeedFailure(FeedFailure.FailureType.INGESTION_NODE, deadNodeId));
                 }
                 if (feedInfo.computeLocations.contains(deadNodeId)) {
@@ -564,6 +642,9 @@
                         failures = new ArrayList<FeedFailure>();
                         failureReport.failures.put(feedInfo, failures);
                     }
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Compute Node Failure! " + deadNodeId);
+                    }
                     failures.add(new FeedFailure(FeedFailure.FailureType.COMPUTE_NODE, deadNodeId));
                 }
                 if (feedInfo.storageLocations.contains(deadNodeId)) {
@@ -572,6 +653,9 @@
                         failures = new ArrayList<FeedFailure>();
                         failureReport.failures.put(feedInfo, failures);
                     }
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Storage Node Failure! " + deadNodeId);
+                    }
                     failures.add(new FeedFailure(FeedFailure.FailureType.STORAGE_NODE, deadNodeId));
                 }
             }
@@ -596,6 +680,7 @@
                 builder.append("\n");
                 for (FeedInfo fInfo : failureReport.failures.keySet()) {
                     builder.append(fInfo.feedConnectionId);
+                    feedJobNotificationHandler.deregisterFeed(fInfo);
                 }
                 LOGGER.warning(builder.toString());
             }
@@ -699,6 +784,17 @@
             }
             AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
             work.add(addNodesWork);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                Map<FeedInfo, List<FeedFailure>> feedFailures = failureReport.failures;
+                for (Entry<FeedInfo, List<FeedFailure>> entry : feedFailures.entrySet()) {
+                    for (FeedFailure f : entry.getValue()) {
+                        LOGGER.info("Feed Failure! " + f.failureType + " " + f.nodeId);
+                    }
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered work id: " + addNodesWork.getWorkId());
+            }
             feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -992,7 +1088,7 @@
             }
         }
 
-        private void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
+        public void reviveFeed(String dataverse, String feedName, String dataset, String feedPolicy) {
             PrintWriter writer = new PrintWriter(System.out, true);
             SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, true, false);
             try {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index e88bff8..3197bbb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -92,7 +93,7 @@
                     Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
                     for (FeedInfo feedInfo : affectedFeeds) {
                         try {
-                            recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+                            recoverFeed(feedInfo, work, resp, failureReport.failures.get(feedInfo));
                             if (LOGGER.isLoggable(Level.INFO)) {
                                 LOGGER.info("Recovered feed:" + feedInfo);
                             }
@@ -109,73 +110,124 @@
         }
     }
 
-    private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
-            throws Exception {
+    private void recoverFeed(FeedInfo feedInfo, AddNodeWork work, AddNodeWorkResponse resp,
+            List<FeedFailure> feedFailures) throws Exception {
+        List<String> failedNodeIds = new ArrayList<String>();
         for (FeedFailure feedFailure : feedFailures) {
-            switch (feedFailure.failureType) {
-                case INGESTION_NODE:
-                    alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
-                    break;
-            }
+            failedNodeIds.add(feedFailure.nodeId);
         }
-        JobSpecification spec = feedInfo.jobSpec;
-        System.out.println("Altered Job Spec \n" + spec);
-        Thread.sleep(5000);
-        AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
-    }
-
-    private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
-        String replacementNode = null;
+        List<String> chosenReplacements = new ArrayList<String>();
+        String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+        chosenReplacements.add(metadataNodeName);
         switch (resp.getStatus()) {
             case FAILURE:
-                // TODO 1st preference is given to any other participant node that is not involved in the feed.
-                //      2nd preference is given to a compute node.
-                //      3rd preference is given to a storage node
-                Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
-                if (participantNodes != null && !participantNodes.isEmpty()) {
-                    participantNodes.removeAll(feedInfo.storageLocations);
-                    participantNodes.removeAll(feedInfo.computeLocations);
-                    if (!participantNodes.isEmpty()) {
-                        String[] participantNodesArray = AsterixClusterProperties.INSTANCE.getParticipantNodes()
-                                .toArray(new String[] {});
-
-                        replacementNode = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Participant Node: " + replacementNode + " chosen as replacement for "
-                                    + failedNodeId);
-                        }
-                    }
-                }
-
-                if (replacementNode == null) {
-                    boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
-                            .size() > 1);
-                    if (computeNodeSubstitute) {
-                        feedInfo.computeLocations.remove(failedNodeId);
-                        replacementNode = feedInfo.computeLocations.get(0);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
-                        }
-                    } else {
-                        replacementNode = feedInfo.storageLocations.get(0);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
-                        }
+                for (FeedFailure feedFailure : feedFailures) {
+                    switch (feedFailure.failureType) {
+                        case INGESTION_NODE:
+                            String replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
+                                    chosenReplacements);
+                            chosenReplacements.add(replacement);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Existing  node:" + replacement + " chosen to replace "
+                                        + feedFailure.nodeId);
+                            }
+                            alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
+                            break;
                     }
                 }
                 break;
             case SUCCESS:
-                Random r = new Random();
-                String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
-                replacementNode = rnodes[r.nextInt(rnodes.length)];
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Newly added node:" + replacementNode + " chosen to replace " + failedNodeId);
+                List<String> nodesAdded = resp.getNodesAdded();
+                int numNodesAdded = nodesAdded.size();
+                int nodeIndex = 0;
+                for (FeedFailure feedFailure : feedFailures) {
+                    switch (feedFailure.failureType) {
+                        case INGESTION_NODE:
+                            String replacement = null;
+                            if (nodeIndex <= numNodesAdded - 1) {
+                                replacement = nodesAdded.get(nodeIndex);
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Newly added node:" + replacement + " chosen to replace "
+                                            + feedFailure.nodeId);
+                                }
+                            } else {
+                                replacement = getInternalReplacement(feedInfo, feedFailure, failedNodeIds,
+                                        chosenReplacements);
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Existing node:" + replacement + " chosen to replace "
+                                            + feedFailure.nodeId);
+                                }
+                                chosenReplacements.add(replacement);
+                            }
+                            alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId, replacement);
+                            nodeIndex++;
+                            break;
+                        default: // ingestion nodes and compute nodes (in currrent implementation) coincide.
+                                 // so correcting ingestion node failure also takes care of compute nodes failure. 
+                                 // Storage node failures cannot be recovered from as in current implementation, we 
+                                 // do not have data replication.
+                    }
                 }
-
                 break;
         }
 
-        if (replacementNode == null) {
+        JobSpecification spec = feedInfo.jobSpec;
+        System.out.println("Final recovery Job Spec \n" + spec);
+        Thread.sleep(5000);
+        AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+    }
+
+    private String getInternalReplacement(FeedInfo feedInfo, FeedFailure feedFailure, List<String> failedNodeIds,
+            List<String> chosenReplacements) {
+        String failedNodeId = feedFailure.nodeId;
+        String replacement = null;;
+        // TODO 1st preference is given to any other participant node that is not involved in the feed.
+        //      2nd preference is given to a compute node.
+        //      3rd preference is given to a storage node
+        Set<String> participantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+        if (participantNodes != null && !participantNodes.isEmpty()) {
+            List<String> pNodesClone = new ArrayList<String>();
+            pNodesClone.addAll(participantNodes);
+            pNodesClone.removeAll(feedInfo.storageLocations);
+            pNodesClone.removeAll(feedInfo.computeLocations);
+            pNodesClone.removeAll(feedInfo.ingestLocations);
+            pNodesClone.removeAll(chosenReplacements);
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                for (String candidateNode : pNodesClone) {
+                    LOGGER.info("Candidate for replacement:" + candidateNode);
+                }
+            }
+            if (!pNodesClone.isEmpty()) {
+                String[] participantNodesArray = pNodesClone.toArray(new String[] {});
+
+                replacement = participantNodesArray[new Random().nextInt(participantNodesArray.length)];
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Participant Node: " + replacement + " chosen as replacement for " + failedNodeId);
+                }
+            }
+        }
+
+        if (replacement == null) {
+            feedInfo.computeLocations.removeAll(failedNodeIds);
+            boolean computeNodeSubstitute = (feedInfo.computeLocations.size() > 1);
+            if (computeNodeSubstitute) {
+                replacement = feedInfo.computeLocations.get(0);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Compute node:" + replacement + " chosen to replace " + failedNodeId);
+                }
+            } else {
+                replacement = feedInfo.storageLocations.get(0);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Storage node:" + replacement + " chosen to replace " + failedNodeId);
+                }
+            }
+        }
+        return replacement;
+    }
+
+    private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId, String replacement) {
+        if (replacement == null) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe("Unable to find replacement for failed node :" + failedNodeId);
                 LOGGER.severe("Feed: " + feedInfo.feedConnectionId + " will be terminated");
@@ -185,7 +237,7 @@
             Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
             t.start();
         } else {
-            replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+            replaceNode(feedInfo.jobSpec, failedNodeId, replacement);
         }
     }
 
@@ -195,7 +247,7 @@
         List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
         List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
         Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
-        Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
+        Map<OperatorDescriptorId, Map<Integer, String>> newConstraints = new HashMap<OperatorDescriptorId, Map<Integer, String>>();
         OperatorDescriptorId opId = null;
         for (Constraint constraint : userConstraints) {
             LValueConstraintExpression lexpr = constraint.getLValue();
@@ -220,21 +272,23 @@
                     if (oldLocation.equals(failedNodeId)) {
                         locationConstraintsToReplace.add(constraint);
                         modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
-                        List<String> newLocs = newConstraints.get(opId);
+                        Map<Integer, String> newLocs = newConstraints.get(opId);
                         if (newLocs == null) {
-                            newLocs = new ArrayList<String>();
+                            newLocs = new HashMap<Integer, String>();
                             newConstraints.put(opId, newLocs);
                         }
-                        newLocs.add(replacementNode);
+                        int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                        newLocs.put(partition, replacementNode);
                     } else {
                         if (modifiedOperators.contains(opId)) {
                             locationConstraintsToReplace.add(constraint);
-                            List<String> newLocs = newConstraints.get(opId);
+                            Map<Integer, String> newLocs = newConstraints.get(opId);
                             if (newLocs == null) {
-                                newLocs = new ArrayList<String>();
+                                newLocs = new HashMap<Integer, String>();
                                 newConstraints.put(opId, newLocs);
                             }
-                            newLocs.add(oldLocation);
+                            int partition = ((PartitionLocationExpression) lexpr).getPartition();
+                            newLocs.put(partition, oldLocation);
                         } else {
                             List<Constraint> clist = candidateConstraints.get(opId);
                             if (clist == null) {
@@ -259,18 +313,23 @@
                 for (Constraint c : clist) {
                     if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
                         ConstraintExpression cexpr = c.getRValue();
+                        int partition = ((PartitionLocationExpression) c.getLValue()).getPartition();
                         String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        newConstraints.get(mopId).add(oldLocation);
+                        newConstraints.get(mopId).put(partition, oldLocation);
                     }
                 }
             }
         }
 
-        for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
+        for (Entry<OperatorDescriptorId, Map<Integer, String>> entry : newConstraints.entrySet()) {
             OperatorDescriptorId nopId = entry.getKey();
-            List<String> clist = entry.getValue();
+            Map<Integer, String> clist = entry.getValue();
             IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, clist.toArray(new String[] {}));
+            String[] locations = new String[clist.size()];
+            for (int i = 0; i < locations.length; i++) {
+                locations[i] = clist.get(i);
+            }
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, locations);
         }
 
     }
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index a5f1813..805f8ea 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -78,9 +78,9 @@
         function initTimeline(ingestLocations) {
 
           document.write("<i>" + "Feed Ingestion" + " "  + "<i>");
-          document.write("<br />" + "Ingestion Locations: " + ingestLocations);
-          document.write("<br />" + "Compute Locations: " + computeLocations);
-          document.write("<br />" + "Storage Locations: " + storageLocations);
+          document.write("<br />" + "Ingestion Locations: " + ingestLocations.replace(",",", "));
+          document.write("<br />" + "Compute Locations: " + computeLocations.replace(",",", "));
+          document.write("<br />" + "Storage Locations: " + storageLocations.replace(",",", "));
           document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
           document.write("<br />" + "Status: " + state);
           document.write("<br />");
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 55e6664..8c317d8 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -23,6 +23,7 @@
 	<xs:element name="cluster_port" type="xs:integer" />
 	<xs:element name="http_port" type="xs:integer" />
 	<xs:element name="debug_port" type="xs:integer" />
+	<xs:element name="metadata_node" type="xs:string" />
 	
 
 	<!-- definition of complex elements -->
@@ -104,6 +105,7 @@
 				<xs:element ref="cl:store" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:working_dir" />
+				<xs:element ref="cl:metadata_node" />
 				<xs:element ref="cl:master_node" />
 				<xs:element ref="cl:node" maxOccurs="unbounded" />
 				<xs:element ref="cl:substitute_nodes" />
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index 7f89cb9..2a70862 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -81,7 +81,7 @@
 
     public static AsterixInstance createAsterixInstance(String asterixInstanceName, Cluster cluster,
             AsterixConfiguration asterixConfiguration) throws FileNotFoundException, IOException {
-        Node metadataNode = getMetadataNode(cluster);
+        Node metadataNode = getMetadataNode(asterixInstanceName, cluster);
         String asterixZipName = AsterixEventService.getAsterixZip().substring(
                 AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
         String asterixVersion = asterixZipName.substring("asterix-server-".length(),
@@ -241,10 +241,21 @@
         Runtime.getRuntime().exec("mv" + " " + destZip + " " + sourceZip);
     }
 
-    private static Node getMetadataNode(Cluster cluster) {
-        Random random = new Random();
-        int nNodes = cluster.getNode().size();
-        return cluster.getNode().get(random.nextInt(nNodes));
+    private static Node getMetadataNode(String asterixInstanceName, Cluster cluster) {
+        Node metadataNode = null;
+        if (cluster.getMetadataNode() != null) {
+            for(Node node: cluster.getNode()){
+                if(node.getId().equals(cluster.getMetadataNode())){
+                    metadataNode = node;
+                    break;
+                }
+            }
+        } else {
+            Random random = new Random();
+            int nNodes = cluster.getNode().size();
+            metadataNode = cluster.getNode().get(random.nextInt(nNodes));
+        }
+        return metadataNode;
     }
 
     public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index a6c7c18..53038d2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -224,6 +224,7 @@
             }
             if (collectThroughput) {
                 int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
+                System.out.println("MEASURED TPUT:" + numTuplesInInterval.get());
                 sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT, System.currentTimeMillis());
             }
             numTuplesInInterval.set(0);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 58615fc..9a70ca3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -101,7 +101,7 @@
         } catch (InterruptedException ie) {
             if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Continuing on failure as per feed policy");
+                    LOGGER.info("Continuing on failure as per feed policy, switching to INACTIVE INGESTION temporarily");
                 }
                 adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
                 FeedRuntimeManager runtimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
@@ -114,6 +114,10 @@
                 }
                 feedFrameWriter.fail();
             } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Interrupted Exception, something went wrong");
+                }
+               
                 FeedManager.INSTANCE.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
                 feedFrameWriter.close();
                 throw new HyracksDataException(ie);