checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
new file mode 100644
index 0000000..091cf4b
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterEventHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.hyracks.bootstrap;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+
+public class ClusterEventHandler implements Runnable {
+
+    private static final Logger LOGGER = Logger.getLogger(ClusterEventHandler.class.getName());
+
+    private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox;
+
+    public ClusterEventHandler(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
+        this.inbox = inbox;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Set<IClusterManagementWork> workSet = inbox.take();
+                int nodesToAdd = 0;
+                Set<String> nodesToRemove = new HashSet<String>();
+                Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
+                Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+                for (IClusterManagementWork w : workSet) {
+                    switch (w.getClusterManagementWorkType()) {
+                        case ADD_NODE:
+                            if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
+                                nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+                            }
+                            nodeAdditionRequests.add(w);
+                            break;
+                        case REMOVE_NODE:
+                            nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+                            nodeRemovalRequests.add(w);
+                            break;
+                    }
+                }
+
+                Set<Node> addedNodes = new HashSet<Node>();
+                for (int i = 0; i < nodesToAdd; i++) {
+                    Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+                    if (node != null) {
+                        try {
+                            ClusterManager.INSTANCE.addNode(node);
+                            addedNodes.add(node);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Added NC at:" + node.getId());
+                            }
+                        } catch (AsterixException e) {
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Unable to add NC at:" + node.getId());
+                            }
+                            e.printStackTrace();
+                        }
+                    } else {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Unable to add NC: no more available nodes");
+                        }
+                    }
+                }
+
+                for (IClusterManagementWork w : nodeAdditionRequests) {
+                    w.getSourceSubscriber().notifyRequestCompletion(
+                            new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
+                }
+
+            } catch (InterruptedException e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("interruped" + e.getMessage());
+                }
+                throw new IllegalStateException(e);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Unexpected exception in handling cluster event" + e.getMessage());
+                }
+            }
+
+        }
+    }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 941733e..4f355f8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -14,33 +14,32 @@
  */
 package edu.uci.ics.asterix.hyracks.bootstrap;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
 import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
-import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
 import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
-import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
 
 public class ClusterLifecycleListener implements IClusterLifecycleListener {
 
+    private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+
     public static ClusterLifecycleListener INSTANCE = new ClusterLifecycleListener();
 
     private ClusterLifecycleListener() {
+        Thread t = new Thread(eventHandler);
+        t.start();
     }
 
-    private static final Logger LOGGER = Logger.getLogger(ClusterLifecycleListener.class.getName());
+    private static final LinkedBlockingQueue<Set<IClusterManagementWork>> outbox = new LinkedBlockingQueue<Set<IClusterManagementWork>>();
+    private static ClusterEventHandler eventHandler = new ClusterEventHandler(outbox);
 
     @Override
     public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
@@ -64,52 +63,13 @@
             work.addAll(sub.notifyNodeFailure(deadNodeIds));
         }
 
-        int nodesToAdd = 0;
-        Set<String> nodesToRemove = new HashSet<String>();
-        Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
-        Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
-        for (IClusterManagementWork w : work) {
-            switch (w.getClusterManagementWorkType()) {
-                case ADD_NODE:
-                    if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
-                        nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
-                    }
-                    nodeAdditionRequests.add(w);
-                    break;
-                case REMOVE_NODE:
-                    nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
-                    nodeRemovalRequests.add(w);
-                    break;
+        try {
+            outbox.put(work);
+        } catch (InterruptedException e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Interrupted :" + e.getMessage());
             }
         }
 
-        Set<Node> addedNodes = new HashSet<Node>();
-        for (int i = 0; i < nodesToAdd; i++) {
-            Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
-            if (node != null) {
-                try {
-                    ClusterManager.INSTANCE.addNode(node);
-                    addedNodes.add(node);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Added NC at:" + node.getId());
-                    }
-                } catch (AsterixException e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to add NC at:" + node.getId());
-                    }
-                    e.printStackTrace();
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to add NC: no more available nodes");
-                }
-            }
-        }
-
-        for (IClusterManagementWork w : nodeAdditionRequests) {
-            w.getSourceSubscriber().notifyRequestCompletion(
-                    new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
-        }
-
     }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
index bde860f..6948dbc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -23,13 +23,16 @@
 
     protected final IClusterEventsSubscriber subscriber;
 
+    protected final int workId;
+
     @Override
     public int getWorkId() {
-        return WorkIdGenerator.getNextWorkId();
+        return workId;
     }
 
     public AbstractClusterManagementWork(IClusterEventsSubscriber subscriber) {
         this.subscriber = subscriber;
+        this.workId = WorkIdGenerator.getNextWorkId();
     }
 
     private static class WorkIdGenerator {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
index e720c9f..5970978 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -13,10 +13,12 @@
         this.status = status;
     }
 
+    @Override
     public IClusterManagementWork getWork() {
         return work;
     }
 
+    @Override
     public Status getStatus() {
         return status;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
index ae3ce7e..1ac9e34 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
@@ -1,11 +1,22 @@
 package edu.uci.ics.asterix.metadata.cluster;
 
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
 public interface IClusterManagementWorkResponse {
 
     public enum Status {
         SUCCESS,
         FAILURE
     }
-    
-    
+
+    /**
+     * @return
+     */
+    public IClusterManagementWork getWork();
+
+    /**
+     * @return
+     */
+    public Status getStatus();
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 8c03198..9544abd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -65,6 +65,7 @@
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -443,10 +444,10 @@
             }
 
             Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
-       
+
             switch (adapterFactory.getAdapterType()) {
                 case TYPED:
-                    adapterOutputType = null;
+                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
                     break;
                 case GENERIC:
                     String outputTypeName = datasetDetails.getProperties().get("output-type-name");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
index 3a981b2..2613aad 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
@@ -186,7 +186,7 @@
             try {
                 IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
                 JobInfo info = hcc.getJobInfo(message.jobId);
-
+                feedInfo.jobInfo = info;
                 Map<String, String> feedActivityDetails = new HashMap<String, String>();
                 StringBuilder ingestLocs = new StringBuilder();
                 for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
@@ -279,6 +279,7 @@
         public JobSpecification jobSpec;
         public List<String> ingestLocations = new ArrayList<String>();
         public List<String> computeLocations = new ArrayList<String>();
+        public JobInfo jobInfo;
 
         public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
             this.feedId = feedId;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
index bd154e6..257c8fd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
@@ -24,27 +24,40 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
 import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
 import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -61,10 +74,13 @@
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOGGER = Logger.getLogger(FeedLifecycleListener.class.getName());
+
     public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
 
     private LinkedBlockingQueue<Message> jobEventInbox;
     private LinkedBlockingQueue<FeedFailureReport> failureEventInbox;
+    private Map<Integer, FeedFailureReport> feedsWaitingForResponse = new HashMap<Integer, FeedFailureReport>();
 
     private FeedLifecycleListener() {
         jobEventInbox = new LinkedBlockingQueue<Message>();
@@ -216,7 +232,7 @@
             try {
                 IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
                 JobInfo info = hcc.getJobInfo(message.jobId);
-
+                feedInfo.jobInfo = info;
                 Map<String, String> feedActivityDetails = new HashMap<String, String>();
                 StringBuilder ingestLocs = new StringBuilder();
                 for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
@@ -309,6 +325,7 @@
         public JobSpecification jobSpec;
         public List<String> ingestLocations = new ArrayList<String>();
         public List<String> computeLocations = new ArrayList<String>();
+        public JobInfo jobInfo;
 
         public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
             this.feedId = feedId;
@@ -341,6 +358,7 @@
                 }
             }
         }
+
         return handleFailure(failureReport);
     }
 
@@ -374,6 +392,7 @@
 
         AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
         work.add(addNodesWork);
+        feedsWaitingForResponse.put(addNodesWork.getWorkId(), failureReport);
         return work;
     }
 
@@ -402,7 +421,133 @@
 
     @Override
     public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        // TODO Auto-generated method stub
+        IClusterManagementWork submittedWork = response.getWork();
+        switch (submittedWork.getClusterManagementWorkType()) {
+            case ADD_NODE:
+                AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
+                switch (resp.getStatus()) {
+                    case FAILURE:
+                        break;
+                    case SUCCESS:
+                        AddNodeWork work = (AddNodeWork) submittedWork;
+                        FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
+                        Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
+                        for (FeedInfo feedInfo : affectedFeeds) {
+                            try {
+                                recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Recovered feed:" + feedInfo);
+                                }
+                            } catch (Exception e) {
+                                if (LOGGER.isLoggable(Level.SEVERE)) {
+                                    LOGGER.severe("Unable to recover feed:" + feedInfo);
+                                }
+                            }
+                        }
+                        break;
+                }
+                resp.getNodesAdded();
+                break;
+            case REMOVE_NODE:
+                break;
+        }
+    }
+
+    private void recoverFeed(FeedInfo feedInfo, AddNodeWorkResponse resp, List<FeedFailure> feedFailures)
+            throws Exception {
+        for (FeedFailure feedFailure : feedFailures) {
+            switch (feedFailure.failureType) {
+                case INGESTION_NODE:
+                    alterFeedJobSpec(feedInfo, resp, feedFailure.nodeId);
+                    break;
+            }
+        }
+        JobSpecification spec = feedInfo.jobSpec;
+        AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+    }
+
+    private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
+        Random r = new Random();
+        Object[] rnodes = resp.getNodesAdded().toArray();
+        Node replacementNode = (Node) rnodes[r.nextInt(rnodes.length)];
+        String replacementNodeId = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName() + "_"
+                + replacementNode.getId();
+        Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
+        Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
+        List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
+        List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
+        List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
+        Map<OperatorDescriptorId, List<Constraint>> candidateConstraints = new HashMap<OperatorDescriptorId, List<Constraint>>();
+        Map<OperatorDescriptorId, List<String>> newConstraints = new HashMap<OperatorDescriptorId, List<String>>();
+        OperatorDescriptorId opId = null;
+        for (Constraint constraint : userConstraints) {
+            LValueConstraintExpression lexpr = constraint.getLValue();
+            ConstraintExpression cexpr = constraint.getRValue();
+            switch (lexpr.getTag()) {
+                case PARTITION_COUNT:
+                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+                    if (modifiedOperators.contains(opId)) {
+                        countConstraintsToReplace.add(constraint);
+                    } else {
+                        List<Constraint> clist = candidateConstraints.get(opId);
+                        if (clist == null) {
+                            clist = new ArrayList<Constraint>();
+                            candidateConstraints.put(opId, clist);
+                        }
+                        clist.add(constraint);
+                    }
+                    break;
+                case PARTITION_LOCATION:
+                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+                    String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                    if (oldLocation.equals(failedNodeId)) {
+                        locationConstraintsToReplace.add(constraint);
+                        modifiedOperators.add(((PartitionLocationExpression) lexpr).getOperatorDescriptorId());
+                        List<String> newLocs = newConstraints.get(opId);
+                        if (newLocs == null) {
+                            newLocs = new ArrayList<String>();
+                            newConstraints.put(opId, newLocs);
+                        }
+                        newLocs.add(replacementNodeId);
+                    } else {
+                        if (modifiedOperators.contains(opId)) {
+                            locationConstraintsToReplace.add(constraint);
+                            List<String> newLocs = newConstraints.get(opId);
+                            if (newLocs == null) {
+                                newLocs = new ArrayList<String>();
+                                newConstraints.put(opId, newLocs);
+                            }
+                            newLocs.add(oldLocation);
+                        } else {
+                            List<Constraint> clist = candidateConstraints.get(opId);
+                            if (clist == null) {
+                                clist = new ArrayList<Constraint>();
+                                candidateConstraints.put(opId, clist);
+                            }
+                            clist.add(constraint);
+                        }
+                    }
+                    break;
+            }
+        }
+
+        feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+        feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+
+        for (OperatorDescriptorId mopId : modifiedOperators) {
+            List<Constraint> clist = candidateConstraints.get(mopId);
+            if (clist != null && !clist.isEmpty()) {
+                feedInfo.jobSpec.getUserConstraints().removeAll(clist);
+            }
+        }
+
+        for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
+            OperatorDescriptorId nopId = entry.getKey();
+            List<String> clist = entry.getValue();
+            IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
+                    clist.toArray(new String[] {}));
+        }
 
     }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
new file mode 100644
index 0000000..a871aba
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+
+public interface ITypedAdapterFactory extends IAdapterFactory {
+
+    public ARecordType getAdapterOutputType();
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index fe4212d..4742b16 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -38,25 +38,11 @@
     private static final long serialVersionUID = 1L;
     private Map<String, Object> configuration;
 
-    public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx)
-            throws AsterixException {
+    public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
+            IHyracksTaskContext ctx) throws AsterixException {
         super(configuration, ctx);
         this.configuration = configuration;
-
-        String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
-                "followers_count" };
-
-        IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
-                BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
-        ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
-
-        String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
-                "message-text" };
-
-        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
-        IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
-                BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
-        adapterOutputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+        this.adapterOutputType = outputType;
     }
 
     @Override
@@ -64,7 +50,6 @@
         return new SyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
     }
 
-  
     private static class SyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
 
         private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedClient.class.getName());
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 48b1d20..3d05870 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -14,11 +14,28 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
@@ -28,7 +45,7 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class SyntheticTwitterFeedAdapterFactory implements IAdapterFactory {
+public class SyntheticTwitterFeedAdapterFactory implements ITypedAdapterFactory {
 
     /**
      * 
@@ -37,6 +54,10 @@
 
     private Map<String, Object> configuration;
 
+    private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+
+    private static final ARecordType outputType = initOutputType();
+
     @Override
     public String getName() {
         return "synthetic_twitter_feed";
@@ -59,12 +80,61 @@
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+        String dvds = (String) configuration.get(KEY_DATAVERSE_DATASET);
+        String[] components = dvds.split(":");
+        String dataverse = components[0];
+        String dataset = components[1];
+        MetadataTransactionContext ctx = null;
+        NodeGroup ng = null;
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
+            String nodegroupName = ((FeedDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
+            ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            MetadataManager.INSTANCE.abortTransaction(ctx);
+            throw e;
+        }
+        List<String> storageNodes = ng.getNodeNames();
+        Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+        nodes.removeAll(storageNodes);
+        Random r = new Random();
+        String ingestionLocation = nodes.toArray(new String[] {})[r.nextInt(nodes.size())];
+        return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
     }
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
-        return new SyntheticTwitterFeedAdapter(configuration, ctx);
+        return new SyntheticTwitterFeedAdapter(configuration, outputType, ctx);
     }
 
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+    private static ARecordType initOutputType() {
+        ARecordType outputType = null;
+        try {
+            String[] userFieldNames = new String[] { "screen-name", "lang", "friends_count", "statuses_count", "name",
+                    "followers_count" };
+
+            IAType[] userFieldTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32,
+                    BuiltinType.AINT32, BuiltinType.ASTRING, BuiltinType.AINT32 };
+            ARecordType userRecordType = new ARecordType("TwitterUserType", userFieldNames, userFieldTypes, false);
+
+            String[] fieldNames = new String[] { "tweetid", "user", "sender-location", "send-time", "referred-topics",
+                    "message-text" };
+
+            AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
+            IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+                    BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
+            outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);
+
+        } catch (AsterixException e) {
+            throw new IllegalStateException("Unable to initialize output type");
+        }
+        return outputType;
+    }
 }
\ No newline at end of file