added managix commands for starting/stopping a node
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 91b4abb..2292cdf 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -86,7 +86,9 @@
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -578,6 +580,10 @@
             }
             nodegroupName = dataverse + ":" + dd.getName().getValue();
             MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
+            //TODO: Remove this hack. In future we would not mandate metadata node to be one of the 
+            // storage nodes. We require that currently so that ingestion node does not coincide with 
+            // the metadata node.
+            nodeNames.add(metadataNodeName);
         } else {
             nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
         }
@@ -630,6 +636,16 @@
                 }
             }
 
+            if (ds.getDatasetType().equals(DatasetType.FEED)) {
+                FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+                        null);
+                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                if (activeFeed) {
+                    throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+                            + " Operation not supported.");
+                }
+            }
+
             //#. add a new index with PendingAddOp
             Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
                     stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
@@ -904,6 +920,16 @@
                 }
             }
 
+            if (ds.getDatasetType().equals(DatasetType.FEED)) {
+                FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+                        null);
+                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                if (activeFeed) {
+                    throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+                            + " Operation not supported.");
+                }
+            }
+
             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
 
                 //#. prepare jobs to drop the datatset and the indexes in NC
@@ -1012,6 +1038,16 @@
                         + dataverseName);
             }
 
+            if (ds.getDatasetType().equals(DatasetType.FEED)) {
+                FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+                        null);
+                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                if (activeFeed) {
+                    throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+                            + " Operation not supported.");
+                }
+            }
+
             if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
@@ -1380,7 +1416,7 @@
             }
 
             FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
-                    .getDatasetName().getValue());
+                    .getDatasetName().getValue(), null);
             boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
             if (isFeedActive && !bfs.isForceBegin()) {
                 throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
@@ -1442,7 +1478,7 @@
             }
 
             FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName,
-                    datasetName);
+                    datasetName, FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
 
             boolean isFeedActive = FeedOperations.isFeedActive(feedActivity);
             if (!isFeedActive) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index a379a24..024e660 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -14,10 +14,6 @@
  */
 package edu.uci.ics.asterix.file;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -25,10 +21,9 @@
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.feeds.AlterFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.metadata.feeds.FeedMessage;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
@@ -109,9 +104,7 @@
                 feedMessage = new FeedMessage(MessageType.END);
                 break;
             case ALTER:
-                Map<String, Object> wrappedProperties = new HashMap<String, Object>();
-                wrappedProperties.putAll(controlFeedStatement.getProperties());
-                feedMessage = new AlterFeedMessage(wrappedProperties);
+                feedMessage = new AlterFeedMessage(controlFeedStatement.getProperties());
                 break;
         }
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 15e6cd5..1d8ba1d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -34,6 +34,8 @@
 import edu.uci.ics.asterix.api.common.SessionConfig;
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
+import edu.uci.ics.asterix.aql.expression.ControlFeedStatement;
+import edu.uci.ics.asterix.aql.expression.ControlFeedStatement.OperationType;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.Identifier;
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
@@ -52,10 +54,10 @@
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
@@ -66,6 +68,7 @@
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -87,6 +90,7 @@
 
     private LinkedBlockingQueue<Message> jobEventInbox;
     private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+    private Map<FeedInfo, List<String>> dependentFeeds = new HashMap<FeedInfo, List<String>>();
 
     private State state;
 
@@ -127,14 +131,13 @@
         JobSpecification spec = acggf.getJobSpecification();
         boolean feedIngestionJob = false;
         FeedId feedId = null;
-        String feedPolicy = null;
+        Map<String, String> feedPolicy = null;
         for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
             if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
                 continue;
             }
             feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
-            feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy().get(
-                    BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+            feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy();
             feedIngestionJob = true;
             break;
         }
@@ -191,7 +194,7 @@
             return registeredFeeds.containsKey(jobId);
         }
 
-        public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, String feedPolicy) {
+        public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
             if (registeredFeeds.containsKey(jobId)) {
                 throw new IllegalStateException(" Feed already registered ");
             }
@@ -284,15 +287,20 @@
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
-                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
-
-                FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
-                        feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME,
+                        feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
 
                 MetadataManager.INSTANCE.acquireWriteLatch();
                 MetadataTransactionContext mdTxnCtx = null;
                 try {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx,
+                            feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(), null);
+                    FeedActivityType nextState = fa != null
+                            && fa.getActivityType().equals(FeedActivityType.FEED_RECOVERY) ? FeedActivityType.FEED_RESUME
+                            : FeedActivityType.FEED_BEGIN;
+                    FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
+                            feedInfo.feedId.getDataset(), nextState, feedActivityDetails);
                     MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
                             feedInfo.feedId.getDataset()), feedActivity);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -353,9 +361,9 @@
         public List<String> computeLocations = new ArrayList<String>();
         public List<String> storageLocations = new ArrayList<String>();
         public JobInfo jobInfo;
-        public String feedPolicy;
+        public Map<String, String> feedPolicy;
 
-        public FeedInfo(FeedId feedId, JobSpecification jobSpec, String feedPolicy) {
+        public FeedInfo(FeedId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
             this.feedId = feedId;
             this.jobSpec = jobSpec;
             this.feedPolicy = feedPolicy;
@@ -385,19 +393,71 @@
                     }
                     failures.add(new FeedFailure(FeedFailure.FailureType.COMPUTE_NODE, deadNodeId));
                 }
+                if (feedInfo.storageLocations.contains(deadNodeId)) {
+                    List<FeedFailure> failures = failureReport.failures.get(feedInfo);
+                    if (failures == null) {
+                        failures = new ArrayList<FeedFailure>();
+                        failureReport.failures.put(feedInfo, failures);
+                    }
+                    failures.add(new FeedFailure(FeedFailure.FailureType.STORAGE_NODE, deadNodeId));
+                }
             }
         }
-
-        return handleFailure(failureReport);
+        if (failureReport.failures.isEmpty()) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                StringBuilder builder = new StringBuilder();
+                builder.append("No feed is affected by the failure of node(s): ");
+                for (String deadNodeId : deadNodeIds) {
+                    builder.append(deadNodeId + " ");
+                }
+                LOGGER.info(builder.toString());
+            }
+            return new HashSet<IClusterManagementWork>();
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                StringBuilder builder = new StringBuilder();
+                builder.append("Feed affected by the failure of node(s): ");
+                for (String deadNodeId : deadNodeIds) {
+                    builder.append(deadNodeId + " ");
+                }
+                builder.append("\n");
+                for (FeedInfo fInfo : failureReport.failures.keySet()) {
+                    builder.append(fInfo.feedId);
+                }
+                LOGGER.warning(builder.toString());
+            }
+            return handleFailure(failureReport);
+        }
     }
 
     private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
         reportFeedFailure(failureReport);
         Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
         Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
+        FeedPolicyAccessor fpa = null;
+        List<FeedInfo> feedsToTerminate = new ArrayList<FeedInfo>();
         for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
             FeedInfo feedInfo = entry.getKey();
+            fpa = new FeedPolicyAccessor(feedInfo.feedPolicy);
+            if (!fpa.continueOnHardwareFailure()) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Feed " + feedInfo.feedId + " is governed by policy "
+                            + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+                    LOGGER.warning("Feed policy does not require feed to recover from hardware failure. Feed will terminate");
+                }
+                continue;
+            } else {
+                // insert feed recovery mode 
+                reportFeedRecoveryMode(feedInfo);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Feed " + feedInfo.feedId + " is governed by policy "
+                            + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+                    LOGGER.info("Feed policy requires feed to recover from hardware failure. Attempting to recover feed");
+                }
+            }
+
             List<FeedFailure> feedFailures = entry.getValue();
+            boolean recoveryPossible = true;
             for (FeedFailure feedFailure : feedFailures) {
                 switch (feedFailure.failureType) {
                     case COMPUTE_NODE:
@@ -414,20 +474,64 @@
                             failuresBecauseOfThisNode.put(feedInfo, feedF);
                         }
                         feedF.add(feedFailure.failureType);
-
                         break;
                     case STORAGE_NODE:
+                        recoveryPossible = false;
                         if (LOGGER.isLoggable(Level.SEVERE)) {
                             LOGGER.severe("Unrecoverable situation! lost storage node for the feed " + feedInfo.feedId);
                         }
+                        List<String> requiredNodeIds = dependentFeeds.get(feedInfo);
+                        if (requiredNodeIds == null) {
+                            requiredNodeIds = new ArrayList<String>();
+                            dependentFeeds.put(feedInfo, requiredNodeIds);
+                        }
+                        requiredNodeIds.add(feedFailure.nodeId);
+                        failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
+                        if (failuresBecauseOfThisNode != null) {
+                            failuresBecauseOfThisNode.remove(feedInfo);
+                            if (failuresBecauseOfThisNode.isEmpty()) {
+                                failureMap.remove(feedFailure.nodeId);
+                            }
+                        }
+                        feedsToTerminate.add(feedInfo);
                         break;
                 }
             }
+            if (!recoveryPossible) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Terminating irrecoverable feed (loss of storage node) ");
+                }
+            }
         }
 
-        AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
-        work.add(addNodesWork);
-        feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
+        if (!feedsToTerminate.isEmpty()) {
+            Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
+            t.start();
+        }
+
+        int numRequiredNodes = 0;
+        for (Entry<String, Map<FeedInfo, List<FeedFailure.FailureType>>> entry : failureMap.entrySet()) {
+            Map<FeedInfo, List<FeedFailure.FailureType>> v = entry.getValue();
+            for (FeedInfo finfo : feedsToTerminate) {
+                v.remove(finfo);
+            }
+            if (v.size() > 0) {
+                numRequiredNodes++;
+            }
+        }
+
+        if (numRequiredNodes > 0) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Number of additional nodes requested " + numRequiredNodes);
+            }
+            AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
+            work.add(addNodesWork);
+            feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Not requesting any new node. Feeds unrecoverable until the lost node(s) rejoin");
+            }
+        }
         return work;
     }
 
@@ -463,6 +567,29 @@
         }
     }
 
+    private void reportFeedRecoveryMode(FeedInfo feedInfo) {
+        MetadataTransactionContext ctx = null;
+        FeedActivity fa = null;
+        Map<String, String> feedActivityDetails = new HashMap<String, String>();
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            fa = new FeedActivity(feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(),
+                    FeedActivityType.FEED_RECOVERY, feedActivityDetails);
+            MetadataManager.INSTANCE.registerFeedActivity(ctx, feedInfo.feedId, fa);
+
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            if (ctx != null) {
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception e2) {
+                    e2.addSuppressed(e);
+                    throw new IllegalStateException("Unable to abort transaction " + e2);
+                }
+            }
+        }
+    }
+
     public static class FeedFailure {
 
         public enum FailureType {
@@ -491,21 +618,39 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
         }
-        if (!newState.equals(state)) {
-            if (newState == State.ACTIVE) {
+
+        boolean needToReActivateFeeds = !newState.equals(state) && (newState == State.ACTIVE);
+        if (needToReActivateFeeds) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+            }
+            try {
+                FeedsActivator activator = new FeedsActivator();
+                (new Thread(activator)).start();
+            } catch (Exception e) {
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
-                }
-                try {
-                    FeedsActivator activator = new FeedsActivator();
-                    (new Thread(activator)).start();
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Exception in resuming feeds" + e.getMessage());
-                    }
+                    LOGGER.info("Exception in resuming feeds" + e.getMessage());
                 }
             }
             state = newState;
+        } else {
+            List<FeedInfo> feedsThatCanBeRevived = new ArrayList<FeedInfo>();
+            for (Entry<FeedInfo, List<String>> entry : dependentFeeds.entrySet()) {
+                List<String> requiredNodeIds = entry.getValue();
+                if (requiredNodeIds.contains(joinedNodeId)) {
+                    requiredNodeIds.remove(joinedNodeId);
+                    if (requiredNodeIds.isEmpty()) {
+                        feedsThatCanBeRevived.add(entry.getKey());
+                    }
+                }
+            }
+            if (!feedsThatCanBeRevived.isEmpty()) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
+                }
+                FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
+                (new Thread(activator)).start();
+            }
         }
         return null;
     }
@@ -542,12 +687,57 @@
 
     private static class FeedsActivator implements Runnable {
 
+        private List<FeedInfo> feedsToRevive;
+        private Mode mode;
+
+        public enum Mode {
+            REVIVAL_POST_CLUSTER_REBOOT,
+            REVIVAL_POST_NODE_REJOIN
+        }
+
+        public FeedsActivator() {
+            this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+        }
+
+        public FeedsActivator(List<FeedInfo> feedsToRevive) {
+            this.feedsToRevive = feedsToRevive;
+            this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+        }
+
         @Override
         public void run() {
+            switch (mode) {
+                case REVIVAL_POST_CLUSTER_REBOOT:
+                    revivePostClusterReboot();
+                    break;
+                case REVIVAL_POST_NODE_REJOIN:
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e1) {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Attempt to resume feed interrupted");
+                        }
+                        throw new IllegalStateException(e1.getMessage());
+                    }
+                    for (FeedInfo finfo : feedsToRevive) {
+                        try {
+                            JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Resumed feed :" + finfo.feedId + " job id " + jobId);
+                                LOGGER.info("Job:" + finfo.jobSpec);
+                            }
+                        } catch (Exception e) {
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Unable to resume feed " + finfo.feedId + " " + e.getMessage());
+                            }
+                        }
+                    }
+            }
+        }
+
+        private void revivePostClusterReboot() {
             MetadataTransactionContext ctx = null;
 
-            SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
-            PrintWriter writer = new PrintWriter(System.out, true);
             try {
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Attempting to Resume feeds!");
@@ -558,9 +748,7 @@
                 List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx);
                 MetadataManager.INSTANCE.commitTransaction(ctx);
                 for (FeedActivity fa : activeFeeds) {
-
                     String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
-
                     FeedPolicy policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, fa.getDataverseName(), feedPolicy);
                     if (policy == null) {
                         policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -571,36 +759,27 @@
                                         + fa.getDatasetName() + "." + " Unknown policy :" + feedPolicy);
                             }
                         }
+                        continue;
                     }
 
-                    String dataverse = fa.getDataverseName();
-                    String datasetName = fa.getDatasetName();
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Resuming loser feed: " + dataverse + ":" + datasetName + " using policy "
-                                + feedPolicy);
-                    }
-                    try {
-                        DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
-                        BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
-                                datasetName), feedPolicy, 0);
-                        stmt.setForceBegin(true);
-                        List<Statement> statements = new ArrayList<Statement>();
-                        statements.add(dataverseDecl);
-                        statements.add(stmt);
-                        AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
-                        translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+                    FeedPolicyAccessor fpa = new FeedPolicyAccessor(policy.getProperties());
+                    if (fpa.autoRestartOnClusterReboot()) {
+                        String dataverse = fa.getDataverseName();
+                        String datasetName = fa.getDatasetName();
                         if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Resumed feed: " + dataverse + ":" + datasetName + " using policy "
-                                    + feedPolicy);
+                            LOGGER.info("Resuming feed after cluster revival: " + dataverse + ":" + datasetName
+                                    + " using policy " + feedPolicy);
                         }
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + datasetName
-                                    + " using policy " + feedPolicy + " Exception " + e.getMessage());
+                        reviveFeed(dataverse, datasetName, feedPolicy);
+                    } else {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Feed " + fa.getDataverseName() + ":" + fa.getDatasetName()
+                                    + " governed by policy" + feedPolicy
+                                    + " does not state auto restart after cluster revival");
                         }
                     }
                 }
+
             } catch (Exception e) {
                 e.printStackTrace();
                 try {
@@ -612,9 +791,85 @@
                     throw new IllegalStateException(e1);
                 }
             }
+        }
 
+        private void reviveFeed(String dataverse, String dataset, String feedPolicy) {
+            PrintWriter writer = new PrintWriter(System.out, true);
+            SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+            try {
+                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+                BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(dataset),
+                        feedPolicy, 0);
+                stmt.setForceBegin(true);
+                List<Statement> statements = new ArrayList<Statement>();
+                statements.add(dataverseDecl);
+                statements.add(stmt);
+                AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+                            + feedPolicy + " Exception " + e.getMessage());
+                }
+            }
         }
 
     }
 
+    private static class FeedsDeActivator implements Runnable {
+
+        private List<FeedInfo> feedsToTerminate;
+
+        public FeedsDeActivator(List<FeedInfo> feedsToTerminate) {
+            this.feedsToTerminate = feedsToTerminate;
+        }
+
+        @Override
+        public void run() {
+            for (FeedInfo feedInfo : feedsToTerminate) {
+                endFeed(feedInfo);
+            }
+        }
+
+        private void endFeed(FeedInfo feedInfo) {
+            MetadataTransactionContext ctx = null;
+            PrintWriter writer = new PrintWriter(System.out, true);
+            SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+            try {
+                ctx = MetadataManager.INSTANCE.beginTransaction();
+                ControlFeedStatement stmt = new ControlFeedStatement(OperationType.END, new Identifier(
+                        feedInfo.feedId.getDataverse()), new Identifier(feedInfo.feedId.getDataset()));
+                List<Statement> statements = new ArrayList<Statement>();
+                DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedInfo.feedId.getDataverse()));
+                statements.add(dataverseDecl);
+                statements.add(stmt);
+                AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+                translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("End urecoverable feed: " + feedInfo.feedId.getDataverse() + ":"
+                            + feedInfo.feedId.getDataset());
+                }
+                MetadataManager.INSTANCE.commitTransaction(ctx);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Exception in ending loser feed: " + feedInfo.feedId + " Exception " + e.getMessage());
+                }
+                e.printStackTrace();
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception e2) {
+                    e2.addSuppressed(e);
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
+                    }
+                }
+
+            }
+
+        }
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index 465a226..6880308 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -20,8 +20,11 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,13 +32,14 @@
 /**
  * A factory class for creating the @see {CNNFeedAdapter}.
  */
-public class CNNFeedAdapterFactory implements IAdapterFactory {
+public class CNNFeedAdapterFactory implements ITypedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
 
     private List<String> feedURLs = new ArrayList<String>();
     private static Map<String, String> topicFeeds = new HashMap<String, String>();
+    private ARecordType recordType;
 
     public static final String KEY_RSS_URL = "topic";
     public static final String KEY_INTERVAL = "interval";
@@ -77,7 +81,7 @@
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
-        CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, ctx);
+        CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, recordType, ctx);
         return cnnFeedAdapter;
     }
 
@@ -92,13 +96,17 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws Exception {
         this.configuration = configuration;
         String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
         initializeFeedURLs(rssURLProperty);
+        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
+                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+                false);
+
     }
 
     private void initializeFeedURLs(String rssURLProperty) {
@@ -126,8 +134,7 @@
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        AlgebricksPartitionConstraint partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
-        return new AlgebricksCountPartitionConstraint(1);
+        return new AlgebricksCountPartitionConstraint(feedURLs.size());
     }
 
     @Override
@@ -135,4 +142,9 @@
         return SupportedOperation.READ;
     }
 
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return recordType;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index d50ea72..af057c9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -24,6 +24,9 @@
 
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -40,7 +43,7 @@
  * A factory class for creating an instance of HDFSAdapter
  */
 @SuppressWarnings("deprecation")
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -99,7 +102,7 @@
         return HDFS_ADAPTER_NAME;
     }
 
-    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+    private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
         JobConf conf = new JobConf();
         conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
         conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
@@ -124,7 +127,7 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         if (!initialized) {
             hdfsScheduler = initializeHDFSScheduler();
             initialized = true;
@@ -144,7 +147,7 @@
         Arrays.fill(executed, false);
         configured = true;
 
-        atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+        atype = (IAType) outputType;
         configureFormat(atype);
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index b29e150..991dadb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -23,10 +23,11 @@
 
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -38,7 +39,7 @@
  * A factory class for creating an instance of HiveAdapter
  */
 @SuppressWarnings("deprecation")
-public class HiveAdapterFactory extends StreamBasedAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -106,32 +107,32 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         if (!configured) {
             /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
             configureJobConf(configuration);
             JobConf conf = configureJobConf(configuration);
             confFactory = new ConfFactory(conf);
 
-            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
             int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
 
             InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
             inputSplitsFactory = new InputSplitsFactory(inputSplits);
 
-            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            Scheduler scheduler = HDFSAdapterFactory.hdfsScheduler;
             readSchedule = scheduler.getLocationConstraints(inputSplits);
             executed = new boolean[readSchedule.length];
             Arrays.fill(executed, false);
 
-            atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+            atype = (IAType) outputType;
             configureFormat(atype);
             configured = true;
         }
 
     }
 
-    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+    private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
         JobConf conf = new JobConf();
 
         /** configure hive */
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 5f23f96..7ea5d18 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -20,6 +20,8 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -32,7 +34,7 @@
  * NCFileSystemAdapter reads external data residing on the local file system of
  * an NC.
  */
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@@ -62,10 +64,10 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
         String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
-        IAType sourceDatatype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+        IAType sourceDatatype = (IAType) outputType;
         configureFileSplits(splits);
         configureFormat(sourceDatatype);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 5937013..9c137e1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -17,8 +17,11 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -28,11 +31,12 @@
  * This adapter provides the functionality of fetching tweets from Twitter service
  * via pull-based Twitter API.
  */
-public class PullBasedTwitterAdapterFactory implements IAdapterFactory {
+public class PullBasedTwitterAdapterFactory implements ITypedAdapterFactory {
     private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
+    private static ARecordType recordType;
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
@@ -55,8 +59,18 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws Exception {
         this.configuration = configuration;
+        if (recordType != null) {
+            String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
+            IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+                    BuiltinType.ASTRING };
+            try {
+                recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
+            } catch (Exception e) {
+                throw new IllegalStateException("Unable to create adapter output type");
+            }
+        }
     }
 
     @Override
@@ -64,4 +78,9 @@
         return new AlgebricksCountPartitionConstraint(1);
     }
 
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return recordType;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index d904af9..cc366f6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -32,20 +33,20 @@
  * Factory class for creating an instance of @see {RSSFeedAdapter}.
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
-public class RSSFeedAdapterFactory implements IAdapterFactory {
+public class RSSFeedAdapterFactory implements ITypedAdapterFactory {
     private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
     public static final String KEY_RSS_URL = "url";
     public static final String KEY_INTERVAL = "interval";
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
     private ARecordType recordType;
     private List<String> feedURLs = new ArrayList<String>();
 
     @Override
     public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
-        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, ctx);
+        RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
         return rssFeedAdapter;
     }
 
@@ -65,7 +66,7 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws Exception {
         this.configuration = configuration;
         String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
@@ -96,4 +97,9 @@
 
     }
 
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return recordType;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 1287de4..8ddc515 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -31,7 +31,7 @@
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
 
-    protected Map<String, Object> configuration;
+    protected Map<String, String> configuration;
     protected static INodeResolver nodeResolver;
     private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 5debc53..017b511 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -27,10 +28,11 @@
  */
 public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IFeedAdapter {
 
-    private static final long serialVersionUID = 2523303758114582251L;
+    private static final long serialVersionUID = 1L;
 
-    public CNNFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
-        super(configuration, ctx);
+    public CNNFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+            throws AsterixException {
+        super(configuration, recordType, ctx);
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index e8aadff..ee28c3a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -51,7 +51,7 @@
     /**
      * @param configuration
      */
-    public boolean alter(Map<String, Object> configuration);
+    public boolean alter(Map<String, String> configuration);
 
     public void stop();
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e162ffe..4022584 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -48,14 +48,14 @@
     private ByteBuffer frame;
     protected boolean continueIngestion = true;
     protected boolean alterRequested = false;
-    private Map<String, Object> modifiedConfiguration = null;
+    private Map<String, String> modifiedConfiguration = null;
     private long tupleCount = 0;
     private final IHyracksTaskContext ctx;
-    protected Map<String, Object> configuration;
+    protected Map<String, String> configuration;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
-    public PullBasedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) {
+    public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
         this.ctx = ctx;
         this.configuration = configuration;
     }
@@ -64,7 +64,7 @@
         return tupleCount;
     }
 
-    public void alter(Map<String, Object> modifedConfiguration) {
+    public void alter(Map<String, String> modifedConfiguration) {
         this.modifiedConfiguration = modifedConfiguration;
     }
 
@@ -146,7 +146,7 @@
         timer.cancel();
     }
 
-    public Map<String, Object> getConfiguration() {
+    public Map<String, String> getConfiguration() {
         return configuration;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index a9820b7..41143d8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -43,12 +43,8 @@
         return tweetClient;
     }
 
-    public PullBasedTwitterAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+    public PullBasedTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) throws AsterixException {
         super(configuration, ctx);
-        String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
-        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-                BuiltinType.ASTRING };
-        recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
         this.ctx = ctx;
         tweetClient = new PullBasedTwitterFeedClient(ctx, this);
     }
@@ -59,7 +55,7 @@
     }
 
     @Override
-    public void alter(Map<String, Object> properties) {
+    public void alter(Map<String, String> properties) {
         alterRequested = true;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 1c260c9..7a5aeea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -100,7 +100,7 @@
     }
 
     @Override
-    public boolean alter(Map<String, Object> configuration) {
+    public boolean alter(Map<String, String> configuration) {
         // TODO Auto-generated method stub
         return false;
     }
@@ -110,7 +110,7 @@
         // TODO Auto-generated method stub
     }
 
-    private void initialize(Map<String, Object> params) {
+    private void initialize(Map<String, String> params) {
         this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
         this.query = new Query(keywords);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index bd503f2..102482d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -22,8 +22,6 @@
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -36,22 +34,21 @@
     private List<String> feedURLs = new ArrayList<String>();
     private boolean isStopRequested = false;
     private boolean isAlterRequested = false;
-    private Map<String, Object> alteredParams = new HashMap<String, Object>();
+    private Map<String, String> alteredParams = new HashMap<String, String>();
     private String id_prefix = "";
-    private ARecordType recordType;
 
     private IPullBasedFeedClient rssFeedClient;
+    private ARecordType recordType;
 
     public boolean isStopRequested() {
         return isStopRequested;
     }
 
-    public RSSFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+    public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+            throws AsterixException {
         super(configuration, ctx);
-        recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
-                new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
-                false);
         id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+        this.recordType = recordType;
     }
 
     public void setStopRequested(boolean isStopRequested) {
@@ -59,7 +56,7 @@
     }
 
     @Override
-    public void alter(Map<String, Object> properties) {
+    public void alter(Map<String, String> properties) {
         isAlterRequested = true;
         this.alteredParams = properties;
         reconfigure(properties);
@@ -78,8 +75,8 @@
         }
     }
 
-    protected void reconfigure(Map<String, Object> arguments) {
-        String rssURLProperty = (String) configuration.get("KEY_RSS_URL");
+    protected void reconfigure(Map<String, String> arguments) {
+        String rssURLProperty = configuration.get("KEY_RSS_URL");
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
@@ -89,7 +86,7 @@
         return isAlterRequested;
     }
 
-    public Map<String, Object> getAlteredParams() {
+    public Map<String, String> getAlteredParams() {
         return alteredParams;
     }
 
@@ -101,7 +98,7 @@
         return rssFeedClient;
     }
 
-    public ARecordType getAdapterOutputType() {
+    public ARecordType getRecordType() {
         return recordType;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 531eb3d..8a4b301 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -76,7 +76,7 @@
         fetcher.addFetcherEventListener(listener);
         mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
                 new AMutableString(null) };
-        recordType = adapter.getAdapterOutputType();
+        recordType = adapter.getRecordType();
         mutableRecord = new AMutableRecord(recordType, mutableFields);
         tupleFieldValues = new String[recordType.getFieldNames().length];
     }
@@ -140,7 +140,7 @@
     }
 
     @Override
-    public boolean alter(Map<String, Object> configuration) {
+    public boolean alter(Map<String, String> configuration) {
         // TODO Auto-generated method stub
         return false;
     }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
index 2f26547..45cc9f7 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
@@ -67,6 +67,17 @@
             case HELP:
                 cmd = new HelpCommand();
                 break;
+            case STOPNODE:
+                cmd = new StopNodeCommand();
+                break;
+            case STARTNODE:
+                cmd = new StartNodeCommand();
+                break;
+            case VERSION:
+                cmd = new VersionCommand();
+                break;
+            default:
+                break;
         }
         cmd.execute(args);
     }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
index b12b167..68f8532 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
@@ -66,6 +66,15 @@
             case LOG:
                 helpMessage = new LogCommand().getUsageDescription();
                 break;
+            case STOPNODE:
+                helpMessage = new StopNodeCommand().getUsageDescription();
+                break;
+            case STARTNODE:
+                helpMessage = new StartNodeCommand().getUsageDescription();
+                break;
+            case VERSION:
+                helpMessage = new VersionCommand().getUsageDescription();
+                break;
             default:
                 helpMessage = "Unknown command " + command;
         }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
index 7c68d1d..288c882 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
@@ -31,7 +31,10 @@
         UNINSTALL,
         LOG,
         SHUTDOWN,
-        HELP
+        HELP,
+        STOPNODE,
+        STARTNODE,
+        VERSION
     }
 
     public void execute(String args[]) throws Exception;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
index 0e0e6dd..53ec136 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
@@ -48,7 +48,6 @@
         String eventHome = managixHome + File.separator + MANAGIX_INTERNAL_DIR;
         AsterixEventService.initialize(conf, asterixDir, eventHome);
 
-        
         ILookupService lookupService = ServiceProvider.INSTANCE.getLookupService();
         if (ensureLookupServiceIsRunning && !lookupService.isRunning(conf)) {
             lookupService.startService(conf);
@@ -103,6 +102,8 @@
                 + " Produce a tar archive contianing log files from the master and worker nodes" + "\n");
         buffer.append("shutdown " + ":" + " Shutdown the installer service" + "\n");
         buffer.append("help     " + ":" + " Provides usage description of a command" + "\n");
+        buffer.append("version  " + ":" + " Provides version of Asterix/Managix" + "\n");
+
         buffer.append("\nTo get more information about a command, use managix help -cmd <command>");
         LOGGER.info(buffer.toString());
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 4e5813b..9132c63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -16,7 +16,6 @@
 package edu.uci.ics.asterix.metadata;
 
 import java.rmi.RemoteException;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +32,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -617,12 +617,13 @@
     }
 
     @Override
-    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
-            throws MetadataException {
+    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName,
+            FeedActivityType... feedActivityTypes) throws MetadataException {
 
         FeedActivity feedActivity = null;
         try {
-            feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), new FeedId(dataverseName, datasetName));
+            feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), new FeedId(dataverseName, datasetName),
+                    feedActivityTypes);
         } catch (RemoteException e) {
             throw new MetadataException(e);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 01e9df6..ff3909a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -1254,7 +1255,8 @@
     }
 
     @Override
-    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException {
+    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId, FeedActivityType... activityType)
+            throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getDataset());
             FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
@@ -1264,7 +1266,17 @@
             searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
             if (!results.isEmpty()) {
                 Collections.sort(results);
-                return results.get(results.size() - 1);
+                if (activityType == null) {
+                    return results.get(0);
+                } else {
+                    for (FeedActivity result : results) {
+                        for (FeedActivityType ft : activityType) {
+                            if (result.getActivityType().equals(ft)) {
+                                return result;
+                            }
+                        }
+                    }
+                }
             }
             return null;
         } catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 8623dbc..8befe1f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -461,8 +462,8 @@
      * @return
      * @throws MetadataException
      */
-    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
-            throws MetadataException;
+    public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName,
+            FeedActivityType... activityTypeFilter) throws MetadataException;
 
     /**
      * @param ctx
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 3d8a0fe..3c2e241 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -18,7 +18,6 @@
 import java.io.Serializable;
 import java.rmi.Remote;
 import java.rmi.RemoteException;
-import java.util.Collection;
 import java.util.List;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -30,6 +29,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -484,7 +484,8 @@
      * @throws MetadataException
      * @throws RemoteException
      */
-    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException;
+    public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId, FeedActivityType... feedActivityFilter)
+            throws MetadataException, RemoteException;
 
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 3d3bcac..4eeb072 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -64,6 +64,7 @@
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -164,7 +165,6 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
-    public static transient String SCHEDULER = "hdfs-scheduler";
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -358,9 +358,16 @@
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
-            configuration.put("source-datatype", itemType);
-            adapterFactory.configure(configuration);
+            Map<String, String> configuration = datasetDetails.getProperties();
+
+            switch (adapterFactory.getAdapterType()) {
+                case GENERIC:
+                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType);
+                    break;
+                case TYPED:
+                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+                    break;
+            }
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -438,25 +445,23 @@
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+            //Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+            Map<String, String> configuration = datasetDetails.getProperties();
 
             switch (adapterFactory.getAdapterType()) {
                 case TYPED:
                     adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
                     break;
                 case GENERIC:
                     String outputTypeName = datasetDetails.getProperties().get("output-type-name");
                     adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
                             outputTypeName).getDatatype();
+                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
                     break;
                 default:
                     throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
-            configuration.put("source-datatype", adapterOutputType);
-            adapterFactory.configure(configuration);
-
-        } catch (AlgebricksException ae) {
-            throw ae;
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to create adapter  " + e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index b57f8fe..8e6dab3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -41,6 +41,8 @@
         FEED_BEGIN,
         FEED_END,
         FEED_FAILURE,
+        FEED_RECOVERY,
+        FEED_RESUME,
         FEED_STATS,
         FEED_EXPAND,
         FEED_SHRINK
@@ -145,7 +147,7 @@
 
     @Override
     public int compareTo(FeedActivity o) {
-        return this.activityId - o.getActivityId();
+        return o.getActivityId() - this.activityId;
     }
 
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
index 96f1dca..9c0d3e5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
@@ -25,9 +25,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final Map<String, Object> alteredConfParams;
+    private final Map<String, String> alteredConfParams;
 
-    public AlterFeedMessage(Map<String, Object> alteredConfParams) {
+    public AlterFeedMessage(Map<String, String> alteredConfParams) {
         super(MessageType.ALTER);
         this.alteredConfParams = alteredConfParams;
     }
@@ -37,7 +37,7 @@
         return MessageType.ALTER;
     }
 
-    public Map<String, Object> getAlteredConfParams() {
+    public Map<String, String> getAlteredConfParams() {
         return alteredConfParams;
     }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
index a74627c..2d40000 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -43,7 +43,7 @@
     }
 
     private final ARecordType recordType;
-    private final Map<String, Object> configuration;
+    private final Map<String, String> configuration;
     private IValueParserFactory[] valueParserFactories;
     private char delimiter;
     private final ParserType parserType;
@@ -55,7 +55,7 @@
     }
 
     public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, Object> configuration) {
+            char fieldDelimiter, Map<String, String> configuration) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.delimiter = fieldDelimiter;
@@ -64,7 +64,7 @@
 
     }
 
-    public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+    public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
         this.recordType = recordType;
         this.configuration = configuration;
         this.parserType = ParserType.ADM;
@@ -88,7 +88,7 @@
     public static final String BATCH_INTERVAL = "batch-interval";
 
     public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, Object> configuration) {
+            Map<String, String> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
         String propValue = (String) configuration.get(BATCH_SIZE);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3893e54..0d46e43 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -20,7 +20,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -82,8 +81,14 @@
             }
             FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
         } catch (InterruptedException ie) {
-            // check policy
-            adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+            if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Continuing on failure as per feed policy");
+                }
+                adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+            } else {
+                throw new HyracksDataException(ie);
+            }
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 03daa145..b02e05b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -24,6 +27,8 @@
  */
 public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
+    private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
+
     private final FeedId feedId;
     private final IFeedMessage feedMessage;
     private final int partition;
@@ -43,6 +48,9 @@
             if (adapterRuntimeMgr != null) {
                 switch (feedMessage.getMessageType()) {
                     case END:
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Ending feed:" + feedId);
+                        }
                         adapterRuntimeMgr.stop();
                         break;
                     case ALTER:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index cc8e165..5ea79d0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -67,11 +67,6 @@
     public AdapterType getAdapterType();
 
     /**
-     * @param configuration
-     */
-    public void configure(Map<String, Object> configuration) throws Exception;
-
-    /**
      * Returns a list of partition constraints. A partition constraint can be a
      * requirement to execute at a particular location or could be cardinality
      * constraints indicating the number of instances that need to run in
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index 9c5a612..164d292 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -38,6 +38,6 @@
      *            A HashMap containing the set of configuration parameters
      *            that need to be altered.
      */
-    public void alter(Map<String, Object> properties);
+    public void alter(Map<String, String> properties);
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
index a871aba..76ea93a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -1,8 +1,12 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
+import java.util.Map;
+
 import edu.uci.ics.asterix.om.types.ARecordType;
 
 public interface ITypedAdapterFactory extends IAdapterFactory {
 
     public ARecordType getAdapterOutputType();
+
+    public void configure(Map<String, String> configuration) throws Exception;
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 96cba42..81fd78d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.asterix.om.util;
 
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -25,8 +26,12 @@
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 
 /**
  * A holder class for properties related to the Asterix cluster.
@@ -45,6 +50,8 @@
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
     private final Cluster cluster;
 
+    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
+
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
         if (is != null) {
@@ -71,6 +78,7 @@
     public void removeNCConfiguration(String nodeId) {
         // state = State.UNUSABLE;
         ncConfiguration.remove(nodeId);
+        resetClusterPartitionConstraint();
     }
 
     public void addNCConfiguration(String nodeId, Map<String, String> configuration) {
@@ -82,6 +90,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id" + nodeId);
         }
+        resetClusterPartitionConstraint();
     }
 
     /**
@@ -118,4 +127,27 @@
         return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
     }
 
+    public AlgebricksPartitionConstraint getClusterLocations() {
+        if (clusterPartitionConstraint == null) {
+            resetClusterPartitionConstraint();
+        }
+        return clusterPartitionConstraint;
+    }
+
+    private void resetClusterPartitionConstraint() {
+        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+        ArrayList<String> locs = new ArrayList<String>();
+        for (String i : stores.keySet()) {
+            String[] nodeStores = stores.get(i);
+            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+            for (int j = 0; j < nodeStores.length; j++) {
+                for (int k = 0; k < numIODevices; k++) {
+                    locs.add(i);
+                }
+            }
+        }
+        String[] cluster = new String[locs.size()];
+        cluster = locs.toArray(cluster);
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+    }
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index b693073..bd456a2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -24,13 +24,13 @@
 
     private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
 
     private SocketFeedServer socketFeedServer;
 
     private static final int DEFAULT_PORT = 2909;
 
-    public GenericSocketFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+    public GenericSocketFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
             ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
         super(parserFactory, outputtype, ctx);
         this.configuration = configuration;
@@ -53,7 +53,7 @@
         private ServerSocket serverSocket;
         private InputStream inputStream;
 
-        public SocketFeedServer(Map<String, Object> configuration, ARecordType outputtype, int port)
+        public SocketFeedServer(Map<String, String> configuration, ARecordType outputtype, int port)
                 throws IOException, AsterixException {
             try {
                 serverSocket = new ServerSocket(port);
@@ -90,7 +90,7 @@
     }
 
     @Override
-    public void alter(Map<String, Object> properties) {
+    public void alter(Map<String, String> properties) {
         // TODO Auto-generated method stub
 
     }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 961e519..788bb4f 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -18,7 +18,7 @@
 
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -30,7 +30,7 @@
  * Data received is transformed into Asterix Data Format (ADM) and stored into
  * a dataset a configured in the Adapter configuration.
  */
-public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
+public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
 
     /**
      * 
@@ -55,9 +55,9 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
         this.configuration = configuration;
-        outputType = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+        outputType = (ARecordType) outputType;
         this.configureFormat(outputType);
     }
 
@@ -71,9 +71,4 @@
         return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
     }
 
-    @Override
-    public ARecordType getAdapterOutputType() {
-        return outputType;
-    }
-
 }
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 81d19ae..80565b2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -35,7 +35,7 @@
     private static final long serialVersionUID = 1L;
     private FileSystemBasedAdapter coreAdapter;
 
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration,
             FileSystemBasedAdapter coreAdapter, String format, ITupleParserFactory parserFactory,
             IHyracksTaskContext ctx) throws Exception {
         super(parserFactory, atype, ctx);
@@ -48,8 +48,8 @@
     }
 
     @Override
-    public void alter(Map<String, Object> properties) {
-        ((RateControlledTupleParser) tupleParser).setInterTupleInterval(Long.parseLong((String) properties
+    public void alter(Map<String, String> properties) {
+        ((RateControlledTupleParser) tupleParser).setInterTupleInterval(Long.parseLong(properties
                 .get(RateControlledTupleParser.INTER_TUPLE_INTERVAL)));
     }
 
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index dbe5d9a..1de9c9f 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -19,12 +19,12 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
@@ -47,7 +47,8 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory {
+public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory implements
+        IGenericAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String KEY_FILE_SYSTEM = "fs";
@@ -56,9 +57,9 @@
     public static final String KEY_PATH = "path";
     public static final String KEY_FORMAT = "format";
 
-    private IAdapterFactory adapterFactory;
+    private IGenericAdapterFactory adapterFactory;
     private String format;
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
     private ARecordType atype;
 
     @Override
@@ -72,7 +73,7 @@
         return "file_feed";
     }
 
-    private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
         if (configuration.get(KEY_FILE_SYSTEM) == null) {
             throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
         }
@@ -98,7 +99,7 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration, ARecordType recordType) throws Exception {
         this.configuration = configuration;
         checkRequiredArgs(configuration);
         String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
@@ -110,11 +111,11 @@
         } else {
             throw new AsterixException("Unsupported file system type " + fileSystem);
         }
-        format = (String) configuration.get(KEY_FORMAT);
-        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
-        adapterFactory.configure(configuration);
+        format = configuration.get(KEY_FORMAT);
+        adapterFactory = (IGenericAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+        adapterFactory.configure(configuration, recordType);
 
-        atype = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+        atype = (ARecordType) recordType;
         configureFormat();
     }
 
@@ -163,7 +164,7 @@
     private static final long serialVersionUID = 1L;
 
     private final ARecordType recordType;
-    private final Map<String, Object> configuration;
+    private final Map<String, String> configuration;
     private IValueParserFactory[] valueParserFactories;
     private char delimiter;
     private final ParserType parserType;
@@ -174,7 +175,7 @@
     }
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, Object> configuration) {
+            char fieldDelimiter, Map<String, String> configuration) {
         this.recordType = recordType;
         this.valueParserFactories = valueParserFactories;
         this.delimiter = fieldDelimiter;
@@ -182,7 +183,7 @@
         this.parserType = ParserType.DELIMITED_DATA;
     }
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
         this.recordType = recordType;
         this.configuration = configuration;
         this.parserType = ParserType.ADM;
@@ -214,10 +215,10 @@
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
     public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, Object> configuration) {
+            Map<String, String> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
-        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
+        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         } else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 0e58ed2..d26e764 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -36,9 +36,9 @@
 
     private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedAdapter.class.getName());
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
 
-    public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
+    public SyntheticTwitterFeedAdapter(Map<String, String> configuration, ARecordType outputType,
             IHyracksTaskContext ctx) throws AsterixException {
         super(configuration, ctx);
         this.configuration = configuration;
@@ -71,7 +71,7 @@
         private int tweetCountBeforeException = 0;
         private int exceptionPeriod = -1;
 
-        public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType, int partition)
+        public SyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType, int partition)
                 throws AsterixException {
             this.outputRecordType = outputRecordType;
             String value = (String) configuration.get(KEY_DURATION);
@@ -164,7 +164,7 @@
         }
 
         @Override
-        public boolean alter(Map<String, Object> configuration) {
+        public boolean alter(Map<String, String> configuration) {
             // TODO Auto-generated method stub
             return false;
         }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index afb57bf..408d648 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -49,7 +49,7 @@
      */
     private static final long serialVersionUID = 1L;
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
 
     private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
 
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws Exception {
         this.configuration = configuration;
     }
 
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 9cae374..9d47de2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -2,11 +2,8 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -56,13 +53,13 @@
     private byte[] EOL = "\n".getBytes();
     private OutputStream os;
 
-    public TweetGenerator(Map<String, Object> configuration, ARecordType outputRecordType, int partition, String format)
+    public TweetGenerator(Map<String, String> configuration, ARecordType outputRecordType, int partition, String format)
             throws AsterixException {
         this.outputRecordType = outputRecordType;
-        String value = (String) configuration.get(KEY_DURATION);
+        String value = configuration.get(KEY_DURATION);
         duration = value != null ? Integer.parseInt(value) : 60;
-        initializeTweetRate((String) configuration.get(KEY_TPS));
-        value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+        initializeTweetRate(configuration.get(KEY_TPS));
+        value = configuration.get(KEY_EXCEPTION_PERIOD);
         if (value != null) {
             exceptionPeriod = Integer.parseInt(value);
         }
@@ -158,7 +155,7 @@
     }
 
     @Override
-    public boolean alter(Map<String, Object> configuration) {
+    public boolean alter(Map<String, String> configuration) {
         // TODO Auto-generated method stub
         return false;
     }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index c8fa860..a0d13ed 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -30,7 +30,7 @@
 
     private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
 
-    private Map<String, Object> configuration;
+    private Map<String, String> configuration;
 
     private TweetGenerator twitterFeedClient;
 
@@ -41,7 +41,9 @@
     private static final String LOCALHOST = "127.0.0.1";
     private static final int PORT = 2909;
 
-    public TwitterFirehoseFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+    public static final String SIMULATE_UNREACHABLE_SERVER = "simulate-unreachable-server";
+
+    public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
             ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
         super(parserFactory, outputtype, ctx);
         this.configuration = configuration;
@@ -68,7 +70,7 @@
         private final Listener listener;
         private int port = -1;
 
-        public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
+        public TwitterServer(Map<String, String> configuration, ARecordType outputtype) throws IOException,
                 AsterixException {
             int numAttempts = 0;
             while (port < 0) {
@@ -128,7 +130,7 @@
         private TweetGenerator tweetGenerator;
         private boolean continuePush = true;
 
-        public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
+        public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype)
                 throws IOException, AsterixException {
             this.serverSocket = serverSocket;
             this.tweetGenerator = new TweetGenerator(configuration, outputtype, 0,
@@ -173,7 +175,7 @@
     }
 
     @Override
-    public void alter(Map<String, Object> properties) {
+    public void alter(Map<String, String> properties) {
         // TODO Auto-generated method stub
 
     }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 9500436..ca16dd3 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -71,7 +71,7 @@
     }
 
     @Override
-    public void configure(Map<String, Object> configuration) throws Exception {
+    public void configure(Map<String, String> configuration) throws Exception {
         this.configuration = configuration;
         configuration.put(KEY_FORMAT, FORMAT_ADM);
         this.configureFormat(initOutputType());