checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index ed0cab1..b519a39 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -33,7 +33,6 @@
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index cebc710..0e22cc0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -82,6 +82,7 @@
         if (!work.isEmpty()) {
             executeWorkSet(work);
         }
+      
 
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
similarity index 75%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
rename to asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index c436fde..8e01f09 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -12,8 +12,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.metadata.feeds;
+package edu.uci.ics.asterix.hyracks.bootstrap;
 
+import java.io.PrintWriter;
 import java.io.Serializable;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
@@ -29,7 +30,15 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -39,9 +48,13 @@
 import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
 import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -70,6 +83,7 @@
 
     private LinkedBlockingQueue<Message> jobEventInbox;
     private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+    private State state;
 
     private FeedLifecycleListener() {
         jobEventInbox = new LinkedBlockingQueue<Message>();
@@ -80,6 +94,7 @@
         new Thread(feedJobNotificationHandler).start();
         new Thread(feedWorkRequestResponseHandler).start();
         ClusterManager.INSTANCE.registerSubscriber(this);
+        state = AsterixClusterProperties.INSTANCE.getState();
     }
 
     private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -107,16 +122,19 @@
         JobSpecification spec = acggf.getJobSpecification();
         boolean feedIngestionJob = false;
         FeedId feedId = null;
+        String feedPolicy = null;
         for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
             if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
                 continue;
             }
             feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
+            feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy().get(
+                    BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
             feedIngestionJob = true;
             break;
         }
         if (feedIngestionJob) {
-            feedJobNotificationHandler.registerFeed(feedId, jobId, spec);
+            feedJobNotificationHandler.registerFeed(feedId, jobId, spec, feedPolicy);
         }
 
     }
@@ -168,11 +186,11 @@
             return registeredFeeds.containsKey(jobId);
         }
 
-        public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec) {
+        public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, String feedPolicy) {
             if (registeredFeeds.containsKey(jobId)) {
                 throw new IllegalStateException(" Feed already registered ");
             }
-            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec));
+            registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy));
         }
 
         @Override
@@ -249,6 +267,7 @@
 
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
 
                 FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
                         feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
@@ -316,10 +335,12 @@
         public List<String> ingestLocations = new ArrayList<String>();
         public List<String> computeLocations = new ArrayList<String>();
         public JobInfo jobInfo;
+        public String feedPolicy;
 
-        public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
+        public FeedInfo(FeedId feedId, JobSpecification jobSpec, String feedPolicy) {
             this.feedId = feedId;
             this.jobSpec = jobSpec;
+            this.feedPolicy = feedPolicy;
         }
 
     }
@@ -405,7 +426,26 @@
 
     @Override
     public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        // TODO Auto-generated method stub
+        State newState = AsterixClusterProperties.INSTANCE.getState();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
+        }
+        if (!newState.equals(state)) {
+            if (newState == State.ACTIVE) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+                }
+                try {
+                    FeedsActivator activator = new FeedsActivator();
+                    (new Thread(activator)).start();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Exception in resuming feeds" + e.getMessage());
+                    }
+                }
+            }
+            state = newState;
+        }
         return null;
     }
 
@@ -422,24 +462,84 @@
 
     @Override
     public void notifyStateChange(State previousState, State newState) {
-       switch(newState){
-           case ACTIVE:
-                 if(previousState.equals((State.UNUSABLE)){
-                     resumeActiveFeeds();
-                 }
-                 break;
-       }
-        
+        switch (newState) {
+            case ACTIVE:
+                if (previousState.equals(State.UNUSABLE)) {
+                    try {
+                        FeedsActivator activator = new FeedsActivator();
+                        (new Thread(activator)).start();
+                    } catch (Exception e) {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Exception in resuming feeds" + e.getMessage());
+                        }
+                    }
+                }
+                break;
+        }
+
     }
 
-    private void resumeActiveFeeds() {
-        MetadataTransactionContext ctx = null;
-        try {
-            ctx = MetadataManager.INSTANCE.beginTransaction();
-            MetadataManager.INSTANCE.getActiveFeeds(ctx);
-        } catch (Exception e) {
+    private static class FeedsActivator implements Runnable {
+
+        @Override
+        public void run() {
+            MetadataTransactionContext ctx = null;
+
+            SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+            PrintWriter writer = new PrintWriter(System.out, true);
+            try {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Attempting to Resume feeds!!!!!!");
+                }
+                Thread.sleep(2000);
+                MetadataManager.INSTANCE.init();
+                ctx = MetadataManager.INSTANCE.beginTransaction();
+                List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx);
+                MetadataManager.INSTANCE.commitTransaction(ctx);
+                for (FeedActivity fa : activeFeeds) {
+
+                    String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
+                    String dataverse = fa.getDataverseName();
+                    String datasetName = fa.getDatasetName();
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Resuming loser feed: " + dataverse + ":" + datasetName + " using policy "
+                                + feedPolicy);
+                    }
+                    try {
+                        DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+                        BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
+                                datasetName), feedPolicy, 0);
+                        List<Statement> statements = new ArrayList<Statement>();
+                        statements.add(dataverseDecl);
+                        statements.add(stmt);
+                        AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+                        translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Resumed feed: " + dataverse + ":" + datasetName + " using policy "
+                                    + feedPolicy);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + datasetName
+                                    + " using policy " + feedPolicy + " Exception " + e.getMessage());
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception e1) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Exception in aborting" + e.getMessage());
+                    }
+                    throw new IllegalStateException(e1);
+                }
+            }
 
         }
+
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
similarity index 95%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
rename to asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index 8f9beb7..8a889c9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -1,24 +1,23 @@
-package edu.uci.ics.asterix.metadata.feeds;
+package edu.uci.ics.asterix.hyracks.bootstrap;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
-import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailureReport;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedInfo;
 import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
 import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailureReport;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedInfo;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
index c643c42..b329171 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
@@ -99,7 +99,7 @@
 				// instance.setState(State.UNUSABLE);
 			}
 			if (!(instance.getState().equals(State.UNUSABLE))) {
-				// instance.setState(State.ACTIVE);
+				 instance.setState(State.ACTIVE);
 			}
 		} else {
 			if (state.getProcesses() != null && state.getProcesses().size() > 0) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 5618ef8..1854245 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -60,7 +60,8 @@
     private ConfFactory confFactory;
     private IAType atype;
     private boolean configured = false;
-    public static Scheduler hdfsScheduler = initializeHDFSScheduler();
+    public static Scheduler hdfsScheduler;
+    private static boolean initialized = false;
 
     private static Scheduler initializeHDFSScheduler() {
         ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
@@ -124,6 +125,10 @@
 
     @Override
     public void configure(Map<String, Object> configuration) throws Exception {
+        if (!initialized) {
+            hdfsScheduler = initializeHDFSScheduler();
+            initialized = true;
+        }
         this.configuration = configuration;
         JobConf conf = configureJobConf(configuration);
         confFactory = new ConfFactory(conf);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 534b495..e3deba9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -79,11 +79,14 @@
         while (continueIngestion) {
             tupleBuilder.reset();
             try {
+                System.out.println("requesting next tuple");
                 inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
                 switch (inflowState) {
                     case DATA_AVAILABLE:
                         tupleBuilder.addFieldEndOffset();
+                        System.out.println("appending tuple");
                         appendTupleToFrame(writer);
+                        System.out.println("appended tuple");
                         tupleCount++;
                         break;
                     case NO_MORE_DATA:
@@ -103,6 +106,7 @@
                 }
             } catch (Exception failureException) {
                 try {
+                    failureException.printStackTrace();
                     boolean continueIngestion = policyEnforcer.handleSoftwareFailure(failureException);
                     if (continueIngestion) {
                         pullBasedFeedClient.resetOnFailure(failureException);
@@ -120,7 +124,9 @@
 
     private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
         if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+            System.out.println("flushing frame");
             FrameUtils.flushFrame(frame, writer);
+            System.out.println("flushed frame");
             appender.reset(frame, true);
             if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
                     tupleBuilder.getSize())) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index a5cb037..a11197b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -71,6 +71,7 @@
     @Override
     public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
         try {
+            System.out.println("Setting next record");
             InflowState state = setNextRecord();
             boolean first = true;
             switch (state) {
@@ -85,7 +86,7 @@
                     recordBuilder.init();
                     writeRecord(mutableRecord, dataOutput, recordBuilder);
                     break;
- 
+
                 case DATA_NOT_AVAILABLE:
                     break;
                 case NO_MORE_DATA:
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index b84e357..7c6fba0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -123,7 +123,7 @@
     @Override
     public void stop() {
         // TODO Auto-generated method stub
-        
+
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index a008c1d..ba592c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -661,8 +661,8 @@
     }
 
     @Override
-    public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException {
-        Collection<FeedActivity> feedActivities = null;
+    public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException {
+        List<FeedActivity> feedActivities = null;
         try {
             feedActivities = metadataNode.getActiveFeeds(ctx.getJobId());
         } catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b180063..8bfca10 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,7 +17,6 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1245,7 +1244,8 @@
     }
 
     @Override
-    public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException {
+    public List<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException {
+        List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
         try {
             ITupleReference searchKey = createTuple();
             FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1262,13 +1262,17 @@
                         initiatedFeeds.put(fid, fa);
                         break;
                     case FEED_FAILURE:
+                        break;
                     case FEED_END:
                         fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
                         initiatedFeeds.remove(fid);
                         break;
                 }
             }
-            return initiatedFeeds.values();
+            for (FeedActivity fa : initiatedFeeds.values()) {
+                activeFeeds.add(fa);
+            }
+            return activeFeeds;
         } catch (Exception e) {
             throw new MetadataException(e);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index a7833d2..d71db58 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -485,7 +485,7 @@
      * @return
      * @throws MetadataException
      */
-    public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException;
+    public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException;
 
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 9ab483d..e789817 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -531,5 +531,5 @@
      * @throws MetadataException
      * @throws RemoteException
      */
-    public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException;
+    public List<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException;
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9544abd..775ef9e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -474,6 +474,7 @@
         FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
                 BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
 
+        feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
                 feedDesc, feedPolicy.getProperties());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 72a1f03..48f16b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.metadata.entities;
 
-import java.util.List;
 import java.util.Map;
 
 import edu.uci.ics.asterix.metadata.MetadataCache;
@@ -54,6 +53,7 @@
         public static final String INGESTION_RATE = "ingestion-rate";
         public static final String EXCEPTION_LOCATION = "exception-location";
         public static final String EXCEPTION_MESSAGE = "exception-message";
+        public static final String FEED_POLICY_NAME = "feed-policy-name";
 
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java
deleted file mode 100644
index 77dbea0..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailureReport;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedInfo;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-
-public class FeedFailureHandler implements Runnable {
-
-    private LinkedBlockingQueue<FeedFailureReport> inbox = null;
-
-    public FeedFailureHandler(LinkedBlockingQueue<FeedFailureReport> inbox) {
-        this.inbox = inbox;
-    }
-
-    @Override
-    public void run() {
-        while (true) {
-            try {
-                FeedFailureReport failureReport = inbox.take();
-                Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
-                for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
-                    FeedInfo feedInfo = entry.getKey();
-                    List<FeedFailure> feedFailures = entry.getValue();
-                    for (FeedFailure feedFailure : feedFailures) {
-                        switch (feedFailure.failureType) {
-                            case COMPUTE_NODE:
-                            case INGESTION_NODE:
-                                Map<FeedInfo, List<FailureType>> failuresBecauseOfThisNode = failureMap
-                                        .get(feedFailure.nodeId);
-                                if (failuresBecauseOfThisNode == null) {
-                                    failuresBecauseOfThisNode = new HashMap<FeedInfo, List<FailureType>>();
-                                    failuresBecauseOfThisNode.put(feedInfo, new ArrayList<FailureType>());
-                                    failureMap.put(feedFailure.nodeId, failuresBecauseOfThisNode);
-                                }
-                                List<FailureType> feedF = failuresBecauseOfThisNode.get(feedInfo);
-                                if (feedF == null) {
-                                    feedF = new ArrayList<FailureType>();
-                                    failuresBecauseOfThisNode.put(feedInfo, feedF);
-                                }
-                                feedF.add(feedFailure.failureType);
-                                break;
-                            case STORAGE_NODE:
-                        }
-                    }
-                }
-
-                try {
-                    correctFailure(failureMap);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-    }
-
-    private void correctFailure(Map<String, Map<FeedInfo, List<FailureType>>> failureMap) throws AsterixException {
-        for (String nodeId : failureMap.keySet()) {
-            Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
-            if (node != null) {
-                ClusterManager.INSTANCE.addNode(node);
-            }
-        }
-
-    }
-
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 4270fe2..28d0da3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -64,4 +64,8 @@
     public FeedId getFeedId() {
         return feedId;
     }
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 4742b16..5ba983e 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -19,13 +19,10 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.Message;
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
 import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
@@ -36,6 +33,9 @@
 public class SyntheticTwitterFeedAdapter extends PullBasedAdapter {
 
     private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedAdapter.class.getName());
+
     private Map<String, Object> configuration;
 
     public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
@@ -116,6 +116,7 @@
         private void writeTweet(TweetMessage next) {
 
             //tweet id
+            LOGGER.info("Generating next tweet");
             ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
             mutableRecord.setValueAtPos(0, mutableFields[0]);
 
@@ -149,8 +150,10 @@
             // text
             Message m = next.getMessageText();
             char[] content = m.getMessage();
-            ((AMutableString) mutableFields[5]).setValue(new String(content, 0, m.getLength()));
+            String tweetText = new String(content, 0, m.getLength());
+            ((AMutableString) mutableFields[5]).setValue(tweetText);
             mutableRecord.setValueAtPos(5, mutableFields[5]);
+            LOGGER.info(tweetText);
 
         }
 
@@ -168,10 +171,12 @@
 
         @Override
         public InflowState setNextRecord() throws Exception {
+            LOGGER.info("requesting next tweet");
             boolean moreData = tweetIterator.hasNext();
             if (!moreData) {
                 return InflowState.NO_MORE_DATA;
-            }
+            } 
+            LOGGER.info("writing next tweet");
             writeTweet(tweetIterator.next());
             if (tweetInterval != 0) {
                 tweetCount++;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 3d05870..036cb1c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -98,9 +98,13 @@
         }
         List<String> storageNodes = ng.getNodeNames();
         Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
-        nodes.removeAll(storageNodes);
+        String ingestionLocation = null;
+        if (nodes.size() > storageNodes.size()) {
+            nodes.removeAll(storageNodes);
+        }
+        String[] nodesArray = nodes.toArray(new String[] {});
         Random r = new Random();
-        String ingestionLocation = nodes.toArray(new String[] {})[r.nextInt(nodes.size())];
+        ingestionLocation = nodesArray[r.nextInt(nodes.size())];
         return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
     }