checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index d33a499..91b4abb 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -86,9 +86,7 @@
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -1384,7 +1382,7 @@
             FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
                     .getDatasetName().getValue());
             boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
-            if (isFeedActive) {
+            if (isFeedActive && !bfs.isForceBegin()) {
                 throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
                         + " is currently ACTIVE. Operation not supported");
             }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index bbba414..a379a24 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -103,22 +103,22 @@
         IOperatorDescriptor feedMessenger;
         AlgebricksPartitionConstraint messengerPc;
 
-        List<IFeedMessage> feedMessages = new ArrayList<IFeedMessage>();
+        IFeedMessage feedMessage = null;
         switch (controlFeedStatement.getOperationType()) {
             case END:
-                feedMessages.add(new FeedMessage(MessageType.STOP));
+                feedMessage = new FeedMessage(MessageType.END);
                 break;
             case ALTER:
                 Map<String, Object> wrappedProperties = new HashMap<String, Object>();
                 wrappedProperties.putAll(controlFeedStatement.getProperties());
-                feedMessages.add(new AlterFeedMessage(wrappedProperties));
+                feedMessage = new AlterFeedMessage(wrappedProperties);
                 break;
         }
 
         try {
             Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildFeedMessengerRuntime(
                     metadataProvider, spec, (FeedDatasetDetails) dataset.getDatasetDetails(), dataverseName,
-                    datasetName, feedMessages, feedActivity);
+                    datasetName, feedMessage, feedActivity);
             feedMessenger = p.first;
             messengerPc = p.second;
         } catch (AlgebricksException e) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0e22cc0..6461b0c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -82,7 +82,6 @@
         if (!work.isEmpty()) {
             executeWorkSet(work);
         }
-      
 
     }
 
@@ -187,17 +186,28 @@
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Unable to add NC: no more available nodes");
                 }
+
             }
         }
 
         for (AddNodeWork w : nodeAdditionRequests) {
             int n = w.getNumberOfNodes();
             List<String> nodesToBeAddedForWork = new ArrayList<String>();
-            for (int i = 0; i < n; i++) {
+            for (int i = 0; i < n && i < addedNodes.size(); i++) {
                 nodesToBeAddedForWork.add(addedNodes.get(i));
             }
-            AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
-            pendingWorkResponses.add(response);
+            if (nodesToBeAddedForWork.isEmpty()) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Unable to satisfy request by " + w);
+                }
+                AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+                response.setStatus(Status.FAILURE);
+                w.getSourceSubscriber().notifyRequestCompletion(response);
+
+            } else {
+                AddNodeWorkResponse response = new AddNodeWorkResponse(w, nodesToBeAddedForWork);
+                pendingWorkResponses.add(response);
+            }
         }
 
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index dc7dec6..15e6cd5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
@@ -74,6 +75,7 @@
 import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
 
 public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
 
@@ -85,6 +87,7 @@
 
     private LinkedBlockingQueue<Message> jobEventInbox;
     private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+
     private State state;
 
     private FeedLifecycleListener() {
@@ -223,6 +226,7 @@
 
             List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
             List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+            List<OperatorDescriptorId> storageOperatorIds = new ArrayList<OperatorDescriptorId>();
 
             Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
             for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
@@ -236,6 +240,8 @@
                             computeOperatorIds.add(entry.getKey());
                         }
                     }
+                } else if (entry.getValue() instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                    storageOperatorIds.add(entry.getKey());
                 }
             }
 
@@ -257,6 +263,10 @@
                         feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
                     }
                 }
+                StringBuilder storageLocs = new StringBuilder();
+                for (OperatorDescriptorId storageOpId : storageOperatorIds) {
+                    feedInfo.storageLocations.addAll(info.getOperatorLocations().get(storageOpId));
+                }
 
                 for (String ingestLoc : feedInfo.ingestLocations) {
                     ingestLocs.append(ingestLoc);
@@ -266,9 +276,14 @@
                     computeLocs.append(computeLoc);
                     computeLocs.append(",");
                 }
+                for (String storageLoc : feedInfo.storageLocations) {
+                    storageLocs.append(storageLoc);
+                    storageLocs.append(",");
+                }
 
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
                 feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
 
                 FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
@@ -336,6 +351,7 @@
         public JobSpecification jobSpec;
         public List<String> ingestLocations = new ArrayList<String>();
         public List<String> computeLocations = new ArrayList<String>();
+        public List<String> storageLocations = new ArrayList<String>();
         public JobInfo jobInfo;
         public String feedPolicy;
 
@@ -376,6 +392,7 @@
     }
 
     private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
+        reportFeedFailure(failureReport);
         Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
         Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
         for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
@@ -397,8 +414,13 @@
                             failuresBecauseOfThisNode.put(feedInfo, feedF);
                         }
                         feedF.add(feedFailure.failureType);
+
                         break;
                     case STORAGE_NODE:
+                        if (LOGGER.isLoggable(Level.SEVERE)) {
+                            LOGGER.severe("Unrecoverable situation! lost storage node for the feed " + feedInfo.feedId);
+                        }
+                        break;
                 }
             }
         }
@@ -409,6 +431,38 @@
         return work;
     }
 
+    private void reportFeedFailure(FeedFailureReport failureReport) {
+        MetadataTransactionContext ctx = null;
+        FeedActivity fa = null;
+        Map<String, String> feedActivityDetails = new HashMap<String, String>();
+        StringBuilder builder = new StringBuilder();
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            for (Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
+                FeedInfo feedInfo = entry.getKey();
+                List<FeedFailure> feedFailures = entry.getValue();
+                for (FeedFailure failure : feedFailures) {
+                    builder.append(failure + ",");
+                }
+                builder.deleteCharAt(builder.length() - 1);
+                feedActivityDetails.put(FeedActivityDetails.FEED_NODE_FAILURE, builder.toString());
+                fa = new FeedActivity(feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(),
+                        FeedActivityType.FEED_FAILURE, feedActivityDetails);
+                MetadataManager.INSTANCE.registerFeedActivity(ctx, feedInfo.feedId, fa);
+            }
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            if (ctx != null) {
+                try {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                } catch (Exception e2) {
+                    e2.addSuppressed(e);
+                    throw new IllegalStateException("Unable to abort transaction " + e2);
+                }
+            }
+        }
+    }
+
     public static class FeedFailure {
 
         public enum FailureType {
@@ -424,6 +478,11 @@
             this.failureType = failureType;
             this.nodeId = nodeId;
         }
+
+        @Override
+        public String toString() {
+            return failureType + " (" + nodeId + ") ";
+        }
     }
 
     @Override
@@ -524,6 +583,7 @@
                         DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
                         BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
                                 datasetName), feedPolicy, 0);
+                        stmt.setForceBegin(true);
                         List<Statement> statements = new ArrayList<Statement>();
                         statements.add(dataverseDecl);
                         statements.add(stmt);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index 8a889c9..6eaf603 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
 import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
 import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -25,6 +26,7 @@
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -58,26 +60,32 @@
                     AddNodeWorkResponse resp = (AddNodeWorkResponse) response;
                     switch (resp.getStatus()) {
                         case FAILURE:
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Request " + resp.getWork() + " not completed");
+                            }
                             break;
                         case SUCCESS:
-                            AddNodeWork work = (AddNodeWork) submittedWork;
-                            FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
-                            Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
-                            for (FeedInfo feedInfo : affectedFeeds) {
-                                try {
-                                    recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
-                                    if (LOGGER.isLoggable(Level.INFO)) {
-                                        LOGGER.info("Recovered feed:" + feedInfo);
-                                    }
-                                } catch (Exception e) {
-                                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                                        LOGGER.severe("Unable to recover feed:" + feedInfo);
-                                    }
-                                }
+                            if (LOGGER.isLoggable(Level.WARNING)) {
+                                LOGGER.warning("Request " + resp.getWork() + " completed");
                             }
                             break;
                     }
-                    resp.getNodesAdded();
+
+                    AddNodeWork work = (AddNodeWork) submittedWork;
+                    FeedFailureReport failureReport = feedsWaitingForResponse.remove(work.getWorkId());
+                    Set<FeedInfo> affectedFeeds = failureReport.failures.keySet();
+                    for (FeedInfo feedInfo : affectedFeeds) {
+                        try {
+                            recoverFeed(feedInfo, resp, failureReport.failures.get(feedInfo));
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.info("Recovered feed:" + feedInfo);
+                            }
+                        } catch (Exception e) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe("Unable to recover feed:" + feedInfo);
+                            }
+                        }
+                    }
                     break;
                 case REMOVE_NODE:
                     break;
@@ -95,15 +103,46 @@
             }
         }
         JobSpecification spec = feedInfo.jobSpec;
-        //AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
+        System.out.println("ALTERED Job Spec" + spec);
+        Thread.sleep(3000);
+        AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
     }
 
     private void alterFeedJobSpec(FeedInfo feedInfo, AddNodeWorkResponse resp, String failedNodeId) {
-        Random r = new Random();
-        String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
-        String replacementNode = rnodes[r.nextInt(rnodes.length)];
-        Map<OperatorDescriptorId, IOperatorDescriptor> opMap = feedInfo.jobSpec.getOperatorMap();
-        Set<Constraint> userConstraints = feedInfo.jobSpec.getUserConstraints();
+        String replacementNode = null;
+        switch (resp.getStatus()) {
+            case FAILURE:
+                boolean computeNodeSubstitute = (feedInfo.computeLocations.contains(failedNodeId) && feedInfo.computeLocations
+                        .size() > 1);
+                if (computeNodeSubstitute) {
+                    feedInfo.computeLocations.remove(failedNodeId);
+                    replacementNode = feedInfo.computeLocations.get(0);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Compute node:" + replacementNode + " chosen to replace " + failedNodeId);
+                    }
+                } else {
+                    replacementNode = feedInfo.storageLocations.get(0);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Storage node:" + replacementNode + " chosen to replace " + failedNodeId);
+                    }
+                }
+                break;
+            case SUCCESS:
+                Random r = new Random();
+                String[] rnodes = resp.getNodesAdded().toArray(new String[] {});
+                replacementNode = rnodes[r.nextInt(rnodes.length)];
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Newly added node:" + replacementNode + " chosen to replace " + failedNodeId);
+                }
+
+                break;
+        }
+        replaceNode(feedInfo.jobSpec, failedNodeId, replacementNode);
+    }
+
+    private void replaceNode(JobSpecification jobSpec, String failedNodeId, String replacementNode) {
+        Map<OperatorDescriptorId, IOperatorDescriptor> opMap = jobSpec.getOperatorMap();
+        Set<Constraint> userConstraints = jobSpec.getUserConstraints();
         List<Constraint> locationConstraintsToReplace = new ArrayList<Constraint>();
         List<Constraint> countConstraintsToReplace = new ArrayList<Constraint>();
         List<OperatorDescriptorId> modifiedOperators = new ArrayList<OperatorDescriptorId>();
@@ -161,22 +200,29 @@
             }
         }
 
-        feedInfo.jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
-        feedInfo.jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
+        jobSpec.getUserConstraints().removeAll(locationConstraintsToReplace);
+        jobSpec.getUserConstraints().removeAll(countConstraintsToReplace);
 
         for (OperatorDescriptorId mopId : modifiedOperators) {
             List<Constraint> clist = candidateConstraints.get(mopId);
             if (clist != null && !clist.isEmpty()) {
-                feedInfo.jobSpec.getUserConstraints().removeAll(clist);
+                jobSpec.getUserConstraints().removeAll(clist);
+
+                for (Constraint c : clist) {
+                    if (c.getLValue().getTag().equals(ExpressionTag.PARTITION_LOCATION)) {
+                        ConstraintExpression cexpr = c.getRValue();
+                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+                        newConstraints.get(mopId).add(oldLocation);
+                    }
+                }
             }
         }
 
         for (Entry<OperatorDescriptorId, List<String>> entry : newConstraints.entrySet()) {
             OperatorDescriptorId nopId = entry.getKey();
             List<String> clist = entry.getValue();
-            IOperatorDescriptor op = feedInfo.jobSpec.getOperatorMap().get(nopId);
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(feedInfo.jobSpec, op,
-                    clist.toArray(new String[] {}));
+            IOperatorDescriptor op = jobSpec.getOperatorMap().get(nopId);
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec, op, clist.toArray(new String[] {}));
         }
 
     }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
index d443165..8262b41 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
@@ -40,6 +40,7 @@
     private final String policy;
     private Query query;
     private int varCounter;
+    private boolean forceBegin = false;
 
     public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
 
@@ -128,4 +129,12 @@
         visitor.visit(this, arg);
     }
 
+    public boolean isForceBegin() {
+        return forceBegin;
+    }
+
+    public void setForceBegin(boolean forceBegin) {
+        this.forceBegin = forceBegin;
+    }
+
 }
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 0b1a7c1..4fdae83 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -533,7 +533,8 @@
       }
 
       // TODO use fctName.library
-      return new FunctionSignature(fctName.dataverse, fctName.function, arity);
+      String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;
+      return new FunctionSignature(fctName.dataverse, fqFunctionName, arity);
     }
 }
 
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 5192e06..55e6664 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -22,6 +22,8 @@
 	<xs:element name="client_port" type="xs:integer" />
 	<xs:element name="cluster_port" type="xs:integer" />
 	<xs:element name="http_port" type="xs:integer" />
+	<xs:element name="debug_port" type="xs:integer" />
+	
 
 	<!-- definition of complex elements -->
 	<xs:element name="working_dir">
@@ -44,6 +46,7 @@
 				<xs:element ref="cl:client_port" />
 				<xs:element ref="cl:cluster_port" />
 				<xs:element ref="cl:http_port" />
+				<xs:element ref="cl:debug_port" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
@@ -75,6 +78,7 @@
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
 				<xs:element ref="cl:store" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
+				<xs:element ref="cl:debug_port" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
index c503911..2317cfa 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index fd877e6..82b6e9e 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -63,6 +63,13 @@
                         if (javaOpts != null) {
                             builder.append(javaOpts);
                         }
+                        if (node.getDebugPort() != null) {
+                            int debugPort = node.getDebugPort().intValue();
+                            if (!javaOpts.contains("-Xdebug")) {
+                                builder.append((" "
+                                        + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+                            }
+                        }
                         builder.append("\"");
                         envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
                     }
@@ -74,6 +81,13 @@
                         if (javaOpts != null) {
                             builder.append(javaOpts);
                         }
+                        if (node.getDebugPort() != null) {
+                            int debugPort = node.getDebugPort().intValue();
+                            if (!javaOpts.contains("-Xdebug")) {
+                                builder.append((" "
+                                        + "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=" + debugPort));
+                            }
+                        }
                         builder.append("\"");
                         envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
                     }
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
index 02ddb28..49b7abf 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
@@ -35,13 +35,12 @@
 
 public class EventUtil {
 
-	public static final String EVENTS_DIR = "events";
-	public static final String CLUSTER_CONF = "config/cluster.xml";
-	public static final String PATTERN_CONF = "config/pattern.xml";
-	public static final DateFormat dateFormat = new SimpleDateFormat(
-			"yyyy/MM/dd HH:mm:ss");
-	public static final String NC_JAVA_OPTS = "nc.java.opts";
-	public static final String CC_JAVA_OPTS = "cc.java.opts";
+    public static final String EVENTS_DIR = "events";
+    public static final String CLUSTER_CONF = "config/cluster.xml";
+    public static final String PATTERN_CONF = "config/pattern.xml";
+    public static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+    public static final String NC_JAVA_OPTS = "nc.java.opts";
+    public static final String CC_JAVA_OPTS = "cc.java.opts";
 
     private static final String IP_LOCATION = "IP_LOCATION";
     private static final String CLUSTER_ENV = "ENV";
@@ -51,22 +50,21 @@
     private static final String LOCALHOST = "localhost";
     private static final String LOCALHOST_IP = "127.0.0.1";
 
-	public static Cluster getCluster(String clusterConfigurationPath)
-			throws JAXBException {
-		File file = new File(clusterConfigurationPath);
-		JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
-		Unmarshaller unmarshaller = ctx.createUnmarshaller();
-		Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
-		if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
-			cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
-		}
-		for (Node node : cluster.getNode()) {
-			if (node.getClusterIp().equals(LOCALHOST)) {
-				node.setClusterIp(LOCALHOST_IP);
-			}
-		}
-		return cluster;
-	}
+    public static Cluster getCluster(String clusterConfigurationPath) throws JAXBException {
+        File file = new File(clusterConfigurationPath);
+        JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+        Unmarshaller unmarshaller = ctx.createUnmarshaller();
+        Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
+        if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
+            cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
+        }
+        for (Node node : cluster.getNode()) {
+            if (node.getClusterIp().equals(LOCALHOST)) {
+                node.setClusterIp(LOCALHOST_IP);
+            }
+        }
+        return cluster;
+    }
 
     public static long parseTimeInterval(ValueType v, String unit) throws IllegalArgumentException {
         int val = 0;
@@ -193,17 +191,14 @@
             return EventDriver.CLIENT_NODE;
         }
 
-		if (nodeid.equals(cluster.getMasterNode().getId())) {
-			String logDir = cluster.getMasterNode().getLogDir() == null ? cluster
-					.getLogDir()
-					: cluster.getMasterNode().getLogDir();
-			String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster
-					.getJavaHome()
-					: cluster.getMasterNode().getJavaHome();
-			return new Node(cluster.getMasterNode().getId(), cluster
-					.getMasterNode().getClusterIp(), javaHome, logDir, null,
-					null, null);
-		}
+        if (nodeid.equals(cluster.getMasterNode().getId())) {
+            String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
+                    .getLogDir();
+            String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
+                    .getMasterNode().getJavaHome();
+            return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
+                    null, null, null, cluster.getMasterNode().getDebugPort());
+        }
 
         List<Node> nodeList = cluster.getNode();
         for (Node node : nodeList) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 1854245..d50ea72 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -40,7 +40,7 @@
  * A factory class for creating an instance of HDFSAdapter
  */
 @SuppressWarnings("deprecation")
-public class HDFSAdapterFactory extends FileSystemAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 16e6ef7..b29e150 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -38,7 +38,7 @@
  * A factory class for creating an instance of HiveAdapter
  */
 @SuppressWarnings("deprecation")
-public class HiveAdapterFactory extends FileSystemAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 3d8bedf..5f23f96 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -32,7 +32,7 @@
  * NCFileSystemAdapter reads external data residing on the local file system of
  * an NC.
  */
-public class NCFileSystemAdapterFactory extends FileSystemAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
similarity index 80%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 0fa428d..1287de4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/FileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -9,6 +9,7 @@
 import edu.uci.ics.asterix.external.util.DNSResolverFactory;
 import edu.uci.ics.asterix.external.util.INodeResolver;
 import edu.uci.ics.asterix.external.util.INodeResolverFactory;
+import edu.uci.ics.asterix.metadata.feeds.ConditionalPushTupleParserFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -25,7 +26,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
-public abstract class FileSystemAdapterFactory implements IAdapterFactory {
+public abstract class StreamBasedAdapterFactory implements IAdapterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
 
     protected Map<String, Object> configuration;
     protected static INodeResolver nodeResolver;
@@ -36,13 +40,11 @@
     public static final String KEY_DELIMITER = "delimiter";
     public static final String KEY_PATH = "path";
     public static final String KEY_SOURCE_DATATYPE = "source-datatype";
-
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
-
     public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
-    private static final Logger LOGGER = Logger.getLogger(FileSystemAdapterFactory.class.getName());
+    public static final String BATCH_SIZE = "batch-size";
+    public static final String BATCH_INTERVAL = "batch-interval";
 
     protected ITupleParserFactory parserFactory;
     protected ITupleParser parser;
@@ -56,7 +58,8 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+    protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+            throws AsterixException {
         int n = recordType.getFieldTypes().length;
         IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
         for (int i = 0; i < n; i++) {
@@ -73,12 +76,16 @@
         }
 
         Character delimiter = delimiterValue.charAt(0);
-        return new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
+
+        return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
+                configuration) : new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter);
     }
 
-    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType) throws AsterixException {
+    protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
+            throws AsterixException {
         try {
-            return new AdmSchemafullRecordParserFactory(recordType);
+            return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
+                    : new AdmSchemafullRecordParserFactory(recordType);
         } catch (Exception e) {
             throw new AsterixException(e);
         }
@@ -86,15 +93,21 @@
     }
 
     protected void configureFormat(IAType sourceDatatype) throws Exception {
+        String propValue = (String) configuration.get(BATCH_SIZE);
+        int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
+        propValue = (String) configuration.get(BATCH_INTERVAL);
+        long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
+        boolean conditionalPush = batchSize > 0 || batchInterval > 0;
+
         String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
             String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
-                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype);
+                parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
             } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
-                parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype);
+                parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
             }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e3deba9..e162ffe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -150,4 +150,8 @@
         return configuration;
     }
 
+    public void setWriter(IFrameWriter writer) {
+
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index a11197b..8efe919 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -99,7 +99,6 @@
 
     }
 
-    @SuppressWarnings("unchecked")
     private void writeRecord(AMutableRecord record, DataOutput dataOutput, IARecordBuilder recordBuilder)
             throws IOException, AsterixException {
         ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index e17db67..be4debe 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -21,6 +22,7 @@
     protected final ITupleParser tupleParser;
     protected final IAType sourceDatatype;
     protected IHyracksTaskContext ctx;
+    protected AdapterRuntimeManager runtimeManager;
 
     public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx) {
         this.tupleParser = parserFactory.createTupleParser(ctx);
@@ -33,4 +35,5 @@
         InputStream in = getInputStream(partition);
         tupleParser.parse(in, writer);
     }
+
 }
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
new file mode 100644
index 0000000..d15d661
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class EchoDelayFactory implements IFunctionFactory {
+
+    @Override
+    public IExternalScalarFunction getExternalFunction() {
+        return new EchoDelayFunction();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
new file mode 100644
index 0000000..5f9be77
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/EchoDelayFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import java.util.Random;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+
+public class EchoDelayFunction implements IExternalScalarFunction {
+
+    private Random rand = new Random();
+    private long sleepIntervalMin;
+    private long sleepIntervalMax;
+    private int range;
+
+    @Override
+    public void initialize(IFunctionHelper functionHelper) {
+        sleepIntervalMin = 50;
+        sleepIntervalMax = 100;
+        range = (new Long(sleepIntervalMax - sleepIntervalMin)).intValue();
+    }
+
+    @Override
+    public void deinitialize() {
+    }
+
+    @Override
+    public void evaluate(IFunctionHelper functionHelper) throws Exception {
+        JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+        long sleepInterval = rand.nextInt(range);
+     //   Thread.sleep(5);
+        functionHelper.setResult(inputRecord);
+    }
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bee6604..6d00029 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -41,5 +41,13 @@
 			<definition>edu.uci.ics.asterix.external.library.AllTypesFactory
 			</definition>
 		</function>
+		<function>
+			<function_type>SCALAR</function_type>
+			<name>echoDelay</name>
+			<arguments>TweetMessageType</arguments>
+			<return_type>TweetMessageType</return_type>
+			<definition>edu.uci.ics.asterix.external.library.EchoDelayFactory
+			</definition>
+		</function>
 	</functions>
 </library>
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
index c5cbe7bf..5035028 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ValidateCommand.java
@@ -102,7 +102,7 @@
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null);
+                    masterNode.getLogDir(), null, null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 50fb050..eac653b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -317,7 +317,10 @@
                 "edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.PullBasedTwitterAdapterFactory",
                 "edu.uci.ics.asterix.external.adapter.factory.RSSFeedAdapterFactory",
-                "edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory", };
+                "edu.uci.ics.asterix.external.adapter.factory.CNNFeedAdapterFactory",
+                "edu.uci.ics.asterix.tools.external.data.TwitterFirehoseFeedAdapterFactory",
+                "edu.uci.ics.asterix.tools.external.data.GenericSocketFeedAdapterFactory",
+                "edu.uci.ics.asterix.tools.external.data.SyntheticTwitterFeedAdapterFactory"};
         DatasourceAdapter adapter;
         for (String adapterClassName : builtInAdapterClassNames) {
             adapter = getAdapter(adapterClassName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
index 4344ac8..68dcc4c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -20,4 +20,8 @@
         return numberOfNodes;
     }
 
+    @Override
+    public String toString() {
+        return WorkType.ADD_NODE + " " + numberOfNodes + " requested by " + subscriber;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index b639559..5fee34f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -104,9 +104,11 @@
             Patterns startNCPattern = new Patterns(pattern);
             client.submit(startNCPattern);
 
+            removeNode(cluster.getNode(), node);
+
             AsterixInstance instance = lookupService.getAsterixInstance(cluster.getInstanceName());
             instance.getCluster().getNode().add(node);
-            instance.getCluster().getSubstituteNodes().getNode().remove(node);
+            removeNode(instance.getCluster().getSubstituteNodes().getNode(), node);
             lookupService.updateAsterixInstance(instance);
 
         } catch (Exception e) {
@@ -115,6 +117,19 @@
 
     }
 
+    private void removeNode(List<Node> list, Node node) {
+        Node nodeToRemove = null;
+        for (Node n : list) {
+            if (n.getId().equals(node.getId())) {
+                nodeToRemove = n;
+                break;
+            }
+        }
+        if (nodeToRemove != null) {
+            list.remove(nodeToRemove);
+        }
+    }
+
     @Override
     public void removeNode(Node node) throws AsterixException {
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
index 0b15df7..90683d1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -22,4 +22,15 @@
         return nodesToBeRemoved;
     }
 
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append(WorkType.REMOVE_NODE);
+        for (String node : nodesToBeRemoved) {
+            builder.append(node + " ");
+        }
+        builder.append(" requested by " + subscriber);
+        return builder.toString();
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 775ef9e..3d3bcac 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -63,7 +63,6 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
@@ -109,7 +108,6 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -118,7 +116,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
@@ -129,7 +126,6 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -421,7 +417,6 @@
 
         FeedDatasetDetails datasetDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
         DatasourceAdapter adapterEntity;
-        IDatasourceAdapter adapter;
         IAdapterFactory adapterFactory;
         IAType adapterOutputType;
         String adapterName;
@@ -490,12 +485,12 @@
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
             AqlMetadataProvider metadataProvider, JobSpecification jobSpec, FeedDatasetDetails datasetDetails,
-            String dataverse, String dataset, List<IFeedMessage> feedMessages, FeedActivity feedActivity)
+            String dataverse, String dataset, IFeedMessage feedMessage, FeedActivity feedActivity)
             throws AlgebricksException {
         AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
                 .getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS).split(","));
         FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
-                feedMessages);
+                feedMessage);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 48f16b6..b57f8fe 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -49,11 +49,13 @@
     public static class FeedActivityDetails {
         public static final String COMPUTE_LOCATIONS = "compute-locations";
         public static final String INGEST_LOCATIONS = "ingest-locations";
+        public static final String STORAGE_LOCATIONS = "storage-locations";
         public static final String TOTAL_INGESTED = "total-ingested";
         public static final String INGESTION_RATE = "ingestion-rate";
         public static final String EXCEPTION_LOCATION = "exception-location";
         public static final String EXCEPTION_MESSAGE = "exception-message";
         public static final String FEED_POLICY_NAME = "feed-policy-name";
+        public static final String FEED_NODE_FAILURE = "feed-node-failure";
 
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
new file mode 100644
index 0000000..78daa08
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.metadata.feeds.MaterializingFrameWriter.Mode;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class AdapterRuntimeManager implements IAdapterExecutor {
+
+    private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
+
+    private final FeedId feedId;
+
+    private IFeedAdapter feedAdapter;
+
+    private AdapterExecutor adapterExecutor;
+
+    private Thread adapterRuntime;
+
+    private State state;
+
+    private FeedInboxMonitor feedInboxMonitor;
+
+    public enum State {
+        ACTIVE_INGESTION,
+        INACTIVE_INGESTION,
+        FINISHED_INGESTION
+    }
+
+    public AdapterRuntimeManager(FeedId feedId, IFeedAdapter feedAdapter, MaterializingFrameWriter writer,
+            int partition, LinkedBlockingQueue<IFeedMessage> inbox) {
+        this.feedId = feedId;
+        this.feedAdapter = feedAdapter;
+        this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
+        this.adapterRuntime = new Thread(adapterExecutor);
+        this.feedInboxMonitor = new FeedInboxMonitor(this, inbox, partition);
+        AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
+    }
+
+    @Override
+    public void start() throws Exception {
+        state = State.ACTIVE_INGESTION;
+        FeedManager.INSTANCE.registerFeedRuntime(this);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
+        }
+        adapterRuntime.start();
+    }
+
+    @Override
+    public void stop() {
+        try {
+            feedAdapter.stop();
+            state = State.FINISHED_INGESTION;
+            synchronized (this) {
+                notifyAll();
+            }
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Unable to stop adapter");
+            }
+        }
+    }
+
+    @Override
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public IFeedAdapter getFeedAdapter() {
+        return feedAdapter;
+    }
+
+    public void setFeedAdapter(IFeedAdapter feedAdapter) {
+        this.feedAdapter = feedAdapter;
+    }
+
+    public static class AdapterExecutor implements Runnable {
+
+        private MaterializingFrameWriter writer;
+
+        private final int partition;
+
+        private IFeedAdapter adapter;
+
+        private AdapterRuntimeManager runtimeManager;
+
+        public AdapterExecutor(int partition, MaterializingFrameWriter writer, IFeedAdapter adapter,
+                AdapterRuntimeManager adapterRuntimeMgr) {
+            this.writer = writer;
+            this.partition = partition;
+            this.adapter = adapter;
+            this.runtimeManager = adapterRuntimeMgr;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Starting ingestion for partition:" + partition);
+                }
+                adapter.start(partition, writer);
+                runtimeManager.setState(State.FINISHED_INGESTION);
+                synchronized (runtimeManager) {
+                    runtimeManager.notifyAll();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        public MaterializingFrameWriter getWriter() {
+            return writer;
+        }
+
+        public void setWriter(IFrameWriter writer) {
+            if (this.writer != null) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Switching writer to:" + writer + " from " + this.writer);
+                }
+                this.writer.setWriter(writer);
+            }
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    @SuppressWarnings("incomplete-switch")
+    public void setState(State state) throws HyracksDataException {
+        if (this.state.equals(state)) {
+            return;
+        }
+        switch (state) {
+            case INACTIVE_INGESTION:
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("SETTING STORE MODE");
+                }
+                adapterExecutor.getWriter().setMode(Mode.STORE);
+                break;
+            case ACTIVE_INGESTION:
+                adapterExecutor.getWriter().setMode(Mode.FORWARD);
+                break;
+        }
+        this.state = state;
+    }
+
+    public AdapterExecutor getAdapterExecutor() {
+        return adapterExecutor;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 3aa6b05..3db5916 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -8,17 +8,20 @@
 
 public class BuiltinFeedPolicies {
 
-    public static final FeedPolicy MISSTION_CRITICAL = initializeMissionCriticalFeedPolicy();
-
-    public static final FeedPolicy ADVANCED = initializeAdvancedFeedPolicy();
-
-    public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+    public static final FeedPolicy BRITTLE = initializeBrittlePolicy();
 
     public static final FeedPolicy BASIC = initializeBasicPolicy();
 
-    public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
+    public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
 
-    public static final FeedPolicy DEFAULT_POLICY = ADVANCED;
+    public static final FeedPolicy FAULT_TOLERANT_BASIC_MONITORED = initializeFaultTolerantBasicMonitoredPolicy();
+
+    public static final FeedPolicy ELASTIC = initializeFaultTolerantBasicMonitoredElasticPolicy();
+
+    public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
+            FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
+
+    public static final FeedPolicy DEFAULT_POLICY = FAULT_TOLERANT_BASIC_MONITORED;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
@@ -31,53 +34,74 @@
         return null;
     }
 
-    private static FeedPolicy initializeMissionCriticalFeedPolicy() {
+    private static FeedPolicy initializeFaultTolerantBasicMonitoredElasticPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "true");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
-        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "MissionCritical";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "MissionCritical", description, policyParams);
+        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "true");
+        String description = "Basic Monitored Fault-Tolerant Elastic";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMFE", description, policyParams);
     }
 
-    private static FeedPolicy initializeBasicPolicy() {
+    private static FeedPolicy initializeFaultTolerantBasicMonitoredPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "false");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "false");
+        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "Basic";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+        String description = "Basic Monitored Fault-Tolerant";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BMF", description, policyParams);
     }
 
     private static FeedPolicy initializeBasicMonitoredPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "5");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD, "60");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "BasicMonitored";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicMonitored", description, policyParams);
+        String description = "Basic Monitored Fault-Tolerant";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BM", description, policyParams);
     }
 
-    private static FeedPolicy initializeAdvancedFeedPolicy() {
+    private static FeedPolicy initializeBasicPolicy() {
         Map<String, String> policyParams = new HashMap<String, String>();
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
-        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
-        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
-        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
         policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
-        String description = "Advanced";
-        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Advanced", description, policyParams);
+        String description = "Basic";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "B", description, policyParams);
     }
+
+    private static FeedPolicy initializeBrittlePolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.FAILURE_LOG_ERROR, "false");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_LOG_DATA, "false");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_CONTINUE, "false");
+        policyParams.put(FeedPolicyAccessor.CLUSTER_REBOOT_AUTO_RESTART, "false");
+        policyParams.put(FeedPolicyAccessor.COLLECT_STATISTICS, "false");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        String description = "Brittle";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Br", description, policyParams);
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
new file mode 100644
index 0000000..a74627c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -0,0 +1,205 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
+import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.IDataParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class ConditionalPushTupleParserFactory implements ITupleParserFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(ConditionalPushTupleParserFactory.class.getName());
+
+    @Override
+    public ITupleParser createTupleParser(IHyracksTaskContext ctx) {
+        IDataParser dataParser = null;
+        switch (parserType) {
+            case ADM:
+                dataParser = new ADMDataParser();
+                break;
+            case DELIMITED_DATA:
+                dataParser = new DelimitedDataParser(recordType, valueParserFactories, delimiter);
+                break;
+        }
+        return new ConditionalPushTupleParser(ctx, recordType, dataParser, configuration);
+    }
+
+    private final ARecordType recordType;
+    private final Map<String, Object> configuration;
+    private IValueParserFactory[] valueParserFactories;
+    private char delimiter;
+    private final ParserType parserType;
+    private Object lock;
+
+    public enum ParserType {
+        ADM,
+        DELIMITED_DATA
+    }
+
+    public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
+            char fieldDelimiter, Map<String, Object> configuration) {
+        this.recordType = recordType;
+        this.valueParserFactories = valueParserFactories;
+        this.delimiter = fieldDelimiter;
+        this.configuration = configuration;
+        this.parserType = ParserType.DELIMITED_DATA;
+
+    }
+
+    public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+        this.recordType = recordType;
+        this.configuration = configuration;
+        this.parserType = ParserType.ADM;
+    }
+
+}
+
+class ConditionalPushTupleParser extends AbstractTupleParser {
+
+    private final IDataParser dataParser;
+    private int batchSize;
+    private long batchInterval;
+    private boolean continueIngestion = true;
+    private int tuplesInFrame = 0;
+    private TimeBasedFlushTask flushTask;
+    private Timer timer = new Timer();
+    private Object lock = new Object();
+    private boolean activeTimer = false;
+
+    public static final String BATCH_SIZE = "batch-size";
+    public static final String BATCH_INTERVAL = "batch-interval";
+
+    public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
+            Map<String, Object> configuration) {
+        super(ctx, recType);
+        this.dataParser = dataParser;
+        String propValue = (String) configuration.get(BATCH_SIZE);
+        batchSize = propValue != null ? Integer.parseInt(propValue) : Integer.MAX_VALUE;
+        propValue = (String) configuration.get(BATCH_INTERVAL);
+        batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
+        activeTimer = batchInterval > 0;
+    }
+
+    public void stop() {
+        continueIngestion = false;
+    }
+
+    @Override
+    public IDataParser getDataParser() {
+        return dataParser;
+    }
+
+    @Override
+    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+        flushTask = new TimeBasedFlushTask(writer, lock);
+        appender.reset(frame, true);
+        IDataParser parser = getDataParser();
+        try {
+            parser.initialize(in, recType, true);
+            if (activeTimer) {
+                timer.schedule(flushTask, 0, batchInterval);
+            }
+            while (continueIngestion) {
+                tb.reset();
+                if (!parser.parse(tb.getDataOutput())) {
+                    break;
+                }
+                tb.addFieldEndOffset();
+                addTuple(writer);
+            }
+            if (appender.getTupleCount() > 0) {
+                if (activeTimer) {
+                    synchronized (lock) {
+                        FrameUtils.flushFrame(frame, writer);
+                    }
+                } else {
+                    FrameUtils.flushFrame(frame, writer);
+                }
+            }
+        } catch (AsterixException ae) {
+            throw new HyracksDataException(ae);
+        } catch (IOException ioe) {
+            throw new HyracksDataException(ioe);
+        } finally {
+            if (activeTimer) {
+                timer.cancel();
+            }
+        }
+    }
+
+    protected void addTuple(IFrameWriter writer) throws HyracksDataException {
+        if (activeTimer) {
+            synchronized (lock) {
+                addTupleToFrame(writer);
+            }
+        } else {
+            addTupleToFrame(writer);
+        }
+    }
+
+    protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException {
+        if (tuplesInFrame == batchSize || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                throw new IllegalStateException();
+            }
+            if (tuplesInFrame == batchSize) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Batch size exceeded! flushing frame " + "(" + tuplesInFrame + ")");
+                }
+            }
+            tuplesInFrame = 0;
+        }
+        tuplesInFrame++;
+    }
+
+    private class TimeBasedFlushTask extends TimerTask {
+
+        private IFrameWriter writer;
+        private final Object lock;
+
+        public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
+            this.writer = writer;
+            this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (tuplesInFrame > 0) {
+                    synchronized (lock) {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
+                        }
+                        FrameUtils.flushFrame(frame, writer);
+                        appender.reset(frame, true);
+                        tuplesInFrame = 0;
+                    }
+                }
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 28d0da3..d007d6f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -52,9 +52,9 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        IDatasourceAdapter adapter;
+        IFeedAdapter adapter;
         try {
-            adapter = adapterFactory.createAdapter(ctx);
+            adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx);
         } catch (Exception e) {
             throw new HyracksDataException("initialization of adapter failed", e);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 6898ef5..3893e54 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -17,8 +17,11 @@
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
@@ -27,52 +30,65 @@
  */
 public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
-    private final IDatasourceAdapter adapter;
+    private static Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
+
+    private IFeedAdapter adapter;
     private final int partition;
-    private final IFeedManager feedManager;
     private final FeedId feedId;
     private final LinkedBlockingQueue<IFeedMessage> inbox;
-    private FeedInboxMonitor feedInboxMonitor;
     private final Map<String, String> feedPolicy;
     private final FeedPolicyEnforcer policyEnforcer;
+    private AdapterRuntimeManager adapterRuntimeMgr;
 
-    public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, Map<String, String> feedPolicy,
+    public FeedIntakeOperatorNodePushable(FeedId feedId, IFeedAdapter adapter, Map<String, String> feedPolicy,
             int partition) {
         this.adapter = adapter;
         this.partition = partition;
-        this.feedManager = (IFeedManager) FeedManager.INSTANCE;
         this.feedId = feedId;
         inbox = new LinkedBlockingQueue<IFeedMessage>();
         this.feedPolicy = feedPolicy;
         policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
-
     }
 
     @Override
     public void open() throws HyracksDataException {
-        feedInboxMonitor = new FeedInboxMonitor((IFeedAdapter) adapter, inbox, partition);
-        AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
-        feedManager.registerFeedMsgQueue(feedId, inbox);
-
-        writer.open();
+        adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
         try {
-            if (adapter instanceof AbstractFeedDatasourceAdapter) {
-                ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+            if (adapterRuntimeMgr == null) {
+                MaterializingFrameWriter mWriter = new MaterializingFrameWriter(writer);
+                adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
+                if (adapter instanceof AbstractFeedDatasourceAdapter) {
+                    ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+                }
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Beginning new feed:" + feedId);
+                }
+                mWriter.open();
+                adapterRuntimeMgr.start();
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Resuming old feed:" + feedId);
+                }
+                adapter = adapterRuntimeMgr.getFeedAdapter();
+                writer.open();
+                adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
+                adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
             }
-            adapter.start(partition, writer);
+
+            synchronized (adapterRuntimeMgr) {
+                while (!adapterRuntimeMgr.getState().equals(State.FINISHED_INGESTION)) {
+                    adapterRuntimeMgr.wait();
+                }
+            }
+            FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
+        } catch (InterruptedException ie) {
+            // check policy
+            adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
         } finally {
             writer.close();
-            if (adapter instanceof IFeedAdapter) {
-                try {
-                    ((IFeedAdapter) adapter).stop();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-                feedManager.unregisterFeedMsgQueue(feedId, inbox);
-            }
         }
     }
 
@@ -99,11 +115,11 @@
 class FeedInboxMonitor extends Thread {
 
     private LinkedBlockingQueue<IFeedMessage> inbox;
-    private final IFeedAdapter adapter;
+    private final AdapterRuntimeManager runtimeMgr;
 
-    public FeedInboxMonitor(IFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+    public FeedInboxMonitor(AdapterRuntimeManager runtimeMgr, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
         this.inbox = inbox;
-        this.adapter = adapter;
+        this.runtimeMgr = runtimeMgr;
     }
 
     @Override
@@ -112,11 +128,11 @@
             try {
                 IFeedMessage feedMessage = inbox.take();
                 switch (feedMessage.getMessageType()) {
-                    case STOP:
-                        adapter.stop();
+                    case END:
+                        runtimeMgr.stop();
                         break;
                     case ALTER:
-                        adapter.alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+                        runtimeMgr.getFeedAdapter().alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
                         break;
                 }
             } catch (InterruptedException ie) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index f9892f2..b3321cb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -14,19 +14,13 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.FeedId;
-import edu.uci.ics.asterix.metadata.feeds.IFeedManager;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
 
 /**
- * Handle (de-)registration of feeds for delivery of control messages.
+ * Handle (de)registration of feeds for delivery of control messages.
  */
 public class FeedManager implements IFeedManager {
 
@@ -36,39 +30,45 @@
 
     }
 
-    private Map<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>> outGoingMsgQueueMap = new HashMap<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>>();
+    private Map<FeedId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedId, List<AdapterRuntimeManager>>();
 
     @Override
-    public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException {
-        Set<LinkedBlockingQueue<IFeedMessage>> operatorQueues = outGoingMsgQueueMap.get(feedId);
-        try {
-            if (operatorQueues != null) {
-                for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
-                    queue.put(feedMessage);
+    public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
+        List<AdapterRuntimeManager> adpaterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
+        if (adpaterRuntimeMgrs == null) {
+            adpaterRuntimeMgrs = new ArrayList<AdapterRuntimeManager>();
+            activeFeedRuntimeManagers.put(adapterRuntimeMgr.getFeedId(), adpaterRuntimeMgrs);
+        }
+        adpaterRuntimeMgrs.add(adapterRuntimeMgr);
+    }
+
+    @Override
+    public synchronized void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
+        List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
+        if (adapterRuntimeMgrs != null && adapterRuntimeMgrs.contains(adapterRuntimeMgr)) {
+            adapterRuntimeMgrs.remove(adapterRuntimeMgr);
+        }
+    }
+
+    @Override
+    public synchronized AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition) {
+        List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(feedId);
+        if (adapterRuntimeMgrs != null) {
+            if (adapterRuntimeMgrs.size() == 1) {
+                return adapterRuntimeMgrs.get(0);
+            } else {
+                for (AdapterRuntimeManager mgr : adapterRuntimeMgrs) {
+                    if (mgr.getAdapterExecutor().getPartition() == partition) {
+                        return mgr;
+                    }
                 }
             }
-        } catch (Exception e) {
-            throw new AsterixException(e);
         }
+        return null;
     }
 
-    @Override
-    public void registerFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
-        Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
-        if (feedQueues == null) {
-            feedQueues = new HashSet<LinkedBlockingQueue<IFeedMessage>>();
-        }
-        feedQueues.add(queue);
-        outGoingMsgQueueMap.put(feedId, feedQueues);
-    }
-
-    @Override
-    public void unregisterFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
-        Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
-        if (feedQueues == null || !feedQueues.contains(queue)) {
-            throw new IllegalArgumentException(" Unable to de-register feed message queue. Unknown feedId " + feedId);
-        }
-        feedQueues.remove(queue);
+    public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedId feedId) {
+        return activeFeedRuntimeManagers.get(feedId);
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index 677fc40..a302978 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -14,8 +14,6 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.List;
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,20 +29,19 @@
     private static final long serialVersionUID = 1L;
 
     private final FeedId feedId;
-    private final List<IFeedMessage> feedMessages;
-    private final boolean sendToAll = true;
+    private final IFeedMessage feedMessage;
 
     public FeedMessageOperatorDescriptor(JobSpecification spec, String dataverse, String dataset,
-            List<IFeedMessage> feedMessages) {
+            IFeedMessage feedMessage) {
         super(spec, 0, 1);
         this.feedId = new FeedId(dataverse, dataset);
-        this.feedMessages = feedMessages;
+        this.feedMessage = feedMessage;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessages, sendToAll, partition, nPartitions);
+        return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
     }
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index be5c8f9..03daa145 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,9 +14,7 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.ArrayList;
-import java.util.List;
-
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -27,27 +25,33 @@
 public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
 
     private final FeedId feedId;
-    private final List<IFeedMessage> feedMessages;
-    private IFeedManager feedManager;
+    private final IFeedMessage feedMessage;
+    private final int partition;
 
-    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, List<IFeedMessage> feedMessages,
-            boolean applyToAll, int partition, int nPartitions) {
+    public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IFeedMessage feedMessage,
+            int partition, int nPartitions) {
         this.feedId = feedId;
-        if (applyToAll) {
-            this.feedMessages = feedMessages;
-        } else {
-            this.feedMessages = new ArrayList<IFeedMessage>();
-            feedMessages.add(feedMessages.get(partition));
-        }
-        feedManager = (IFeedManager) FeedManager.INSTANCE;
+        this.feedMessage = feedMessage;
+        this.partition = partition;
     }
 
     @Override
     public void initialize() throws HyracksDataException {
         try {
             writer.open();
-            for (IFeedMessage feedMessage : feedMessages) {
-                feedManager.deliverMessage(feedId, feedMessage);
+            AdapterRuntimeManager adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
+            if (adapterRuntimeMgr != null) {
+                switch (feedMessage.getMessageType()) {
+                    case END:
+                        adapterRuntimeMgr.stop();
+                        break;
+                    case ALTER:
+                        adapterRuntimeMgr.getFeedAdapter().alter(
+                                ((AlterFeedMessage) feedMessage).getAlteredConfParams());
+                        break;
+                }
+            } else {
+                throw new AsterixException("Unknown feed: " + feedId);
             }
         } catch (Exception e) {
             throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
index 3e16877..a3c505b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
@@ -3,12 +3,14 @@
 import java.util.Map;
 
 public class FeedPolicyAccessor {
-    public static final String APPLICATION_FAILURE_PERSIST_EXCEPTION = "software.failure.persist.exception";
-    public static final String APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION = "software.failure.continue.on.exception";
-    public static final String HARDWARE_FAILURE_AUTO_RESTART = "hardware.failure.auto.restart";
-    public static final String STATISTICS_COLLECT = "statistics.collect";
-    public static final String STATISTICS_COLLECT_PERIOD = "statistics.collect.period";
-    public static final String STATISTICS_COLLECT_PERIOD_UNIT = "statistics.collect.period.unit";
+    public static final String FAILURE_LOG_ERROR = "failure.log.error";
+    public static final String APPLICATION_FAILURE_LOG_DATA = "application.failure.log.data";
+    public static final String APPLICATION_FAILURE_CONTINUE = "application.failure.continue";
+    public static final String HARDWARE_FAILURE_CONTINUE = "hardware.failure.continue";
+    public static final String CLUSTER_REBOOT_AUTO_RESTART = "cluster.reboot.auto.restart";
+    public static final String COLLECT_STATISTICS = "collect.statistics";
+    public static final String COLLECT_STATISTICS_PERIOD = "collect.statistics.period";
+    public static final String COLLECT_STATISTICS_PERIOD_UNIT = "collect.statistics.period.unit";
     public static final String ELASTIC = "elastic";
 
     public enum TimeUnit {
@@ -24,24 +26,32 @@
         this.feedPolicy = feedPolicy;
     }
 
-    public boolean persistExceptionDetailsOnApplicationFailure() {
-        return getBooleanPropertyValue(APPLICATION_FAILURE_PERSIST_EXCEPTION);
+    public boolean logErrorOnFailure() {
+        return getBooleanPropertyValue(FAILURE_LOG_ERROR);
+    }
+
+    public boolean logDataOnApplicationFailure() {
+        return getBooleanPropertyValue(APPLICATION_FAILURE_LOG_DATA);
     }
 
     public boolean continueOnApplicationFailure() {
-        return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION);
+        return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE);
     }
 
-    public boolean autoRestartOnHardwareFailure() {
-        return getBooleanPropertyValue(HARDWARE_FAILURE_AUTO_RESTART);
+    public boolean continueOnHardwareFailure() {
+        return getBooleanPropertyValue(HARDWARE_FAILURE_CONTINUE);
+    }
+
+    public boolean autoRestartOnClusterReboot() {
+        return getBooleanPropertyValue(CLUSTER_REBOOT_AUTO_RESTART);
     }
 
     public boolean collectStatistics() {
-        return getBooleanPropertyValue(STATISTICS_COLLECT);
+        return getBooleanPropertyValue(COLLECT_STATISTICS);
     }
 
     public long getStatisicsCollectionPeriodInSecs() {
-        return getIntegerPropertyValue(STATISTICS_COLLECT_PERIOD) * getTimeUnitFactor();
+        return getIntegerPropertyValue(COLLECT_STATISTICS_PERIOD) * getTimeUnitFactor();
     }
 
     public boolean isElastic() {
@@ -49,7 +59,7 @@
     }
 
     private int getTimeUnitFactor() {
-        String v = feedPolicy.get(STATISTICS_COLLECT_PERIOD_UNIT);
+        String v = feedPolicy.get(COLLECT_STATISTICS_PERIOD_UNIT);
         int factor = 1;
         switch (TimeUnit.valueOf(v)) {
             case SEC:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
index f708c91..0f8bcee 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -26,7 +26,7 @@
 
     public boolean handleSoftwareFailure(Exception e) throws RemoteException, ACIDException {
         boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
-        if (feedPolicyAccessor.persistExceptionDetailsOnApplicationFailure()) {
+        if (feedPolicyAccessor.logErrorOnFailure()) {
             persistExceptionDetails(e);
         }
         return continueIngestion;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
new file mode 100644
index 0000000..0f71166
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+public interface IAdapterExecutor {
+
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+
+    public FeedId getFeedId();
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index a14b95f..9c5a612 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -20,7 +20,7 @@
  * Interface implemented by an adapter that can be controlled or managed by external
  * commands (stop,alter)
  */
-public interface IFeedAdapter {
+public interface IFeedAdapter extends IDatasourceAdapter {
 
     /**
      * Discontinue the ingestion of data and end the feed.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index ae728b2..46302e8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -14,47 +14,26 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-
 /**
- * Handle (de-)registration of feeds for delivery of control messages.
+ * Handle (de)registration of feeds for delivery of control messages.
  */
 public interface IFeedManager {
 
     /**
-     * Register an input message queue for a feed specified by feedId.
-     * All messages sent to a feed are directed to the registered queue(s).
-     * 
-     * @param feedId
-     *            an identifier for the feed dataset.
-     * @param queue
-     *            an input message queue for receiving control messages.
+     * @param adapterRuntimeMgr
      */
-    public void registerFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+    public void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
 
     /**
-     * Unregister an input message queue for a feed specified by feedId.
-     * A feed prior to finishing should unregister all previously registered queue(s)
-     * as it is no longer active and thus need not process any control messages.
-     * 
-     * @param feedId
-     *            an identifier for the feed dataset.
-     * @param queue
-     *            an input message queue for receiving control messages.
+     * @param adapterRuntimeMgr
      */
-    public void unregisterFeedMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+    public void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
 
     /**
-     * Deliver a message to a feed with a given feedId.
-     * 
      * @param feedId
-     *            identifier for the feed dataset.
-     * @param feedMessage
-     *            control message that needs to be delivered.
-     * @throws Exception
+     * @param partition
+     * @return
      */
-    public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException;
+    public AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition);
 
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
index 85a15ab..36a45be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
@@ -19,7 +19,7 @@
 public interface IFeedMessage extends Serializable {
 
     public enum MessageType {
-        STOP,
+        END,
         ALTER,
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
new file mode 100644
index 0000000..b423b89
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
@@ -0,0 +1,118 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class MaterializingFrameWriter implements IFrameWriter {
+
+    private static final Logger LOGGER = Logger.getLogger(MaterializingFrameWriter.class.getName());
+
+    private IFrameWriter writer;
+
+    private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+
+    private Mode mode;
+
+    public enum Mode {
+        FORWARD,
+        STORE
+    }
+
+    public MaterializingFrameWriter(IFrameWriter writer) {
+        this.writer = writer;
+        this.mode = Mode.FORWARD;
+    }
+
+    public Mode getMode() {
+        return mode;
+    }
+
+    public void setMode(Mode newMode) throws HyracksDataException {
+        if (this.mode.equals(newMode)) {
+            return;
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Switching to :" + newMode + " from " + this.mode);
+        }
+        switch (newMode) {
+            case FORWARD:
+                this.mode = newMode;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Sending accumulated frames :" + frames.size());
+                }
+                break;
+            case STORE:
+                this.mode = newMode;
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Beginning to store frames :");
+                    LOGGER.info("Frames accumulated till now:" + frames.size());
+                }
+                break;
+        }
+
+    }
+
+    public List<ByteBuffer> getStoredFrames() {
+        return frames;
+    }
+
+    public void clear() {
+        frames.clear();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        switch (mode) {
+            case FORWARD:
+                writer.nextFrame(buffer);
+                if (frames.size() > 0) {
+                    for (ByteBuffer buf : frames) {
+                        System.out.println("Flusing OLD frame: " + buf);
+                        writer.nextFrame(buf);
+                    }
+                }
+                frames.clear();
+                break;
+            case STORE:
+                ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
+                storageBuffer.put(buffer);
+                frames.add(storageBuffer);
+                storageBuffer.flip();
+                break;
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    public IFrameWriter getWriter() {
+        return writer;
+    }
+
+    public void setWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public String toString() {
+        return "MaterializingFrameWriter using " + writer;
+    }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index f764954..96cba42 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -114,8 +114,8 @@
     }
 
     public Node getAvailableSubstitutionNode() {
-        List<Node> subNodes = cluster.getSubstituteNodes().getNode();
-        return subNodes.isEmpty() ? null : subNodes.get(0);
+        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode();
+        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
     }
 
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
index f2d9384..25cb703 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/file/AbstractTupleParser.java
@@ -18,6 +18,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -36,6 +38,8 @@
  */
 public abstract class AbstractTupleParser implements ITupleParser {
 
+    protected static Logger LOGGER = Logger.getLogger(AbstractTupleParser.class.getName());
+
     protected ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
     protected DataOutput dos = tb.getDataOutput();
     protected final FrameTupleAppender appender;
@@ -68,6 +72,7 @@
                 addTupleToFrame(writer);
             }
             if (appender.getTupleCount() > 0) {
+
                 FrameUtils.flushFrame(frame, writer);
             }
         } catch (AsterixException ae) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 39537a8..0c93564 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -14,6 +14,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -182,7 +183,8 @@
             Message message = randMessageGen.getNextRandomMessage();
             Point location = randLocationGen.getRandomPoint();
             DateTime sendTime = randDateGen.getNextRandomDatetime();
-            twMessage.reset(twMessageId + "", twUser, location, sendTime, message.getReferredTopics(), message);
+            twMessage.reset(UUID.randomUUID().toString(), twUser, location, sendTime, message.getReferredTopics(),
+                    message);
             twMessageId++;
             if (twUserId > numTwOnlyUsers) {
                 twUserId = 1;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
new file mode 100644
index 0000000..b693073
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -0,0 +1,97 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class GenericSocketFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String KEY_PORT = "port";
+
+    private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
+
+    private Map<String, Object> configuration;
+
+    private SocketFeedServer socketFeedServer;
+
+    private static final int DEFAULT_PORT = 2909;
+
+    public GenericSocketFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+            ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+        super(parserFactory, outputtype, ctx);
+        this.configuration = configuration;
+        String portValue = (String) this.configuration.get(KEY_PORT);
+        int port = portValue != null ? Integer.parseInt(portValue) : DEFAULT_PORT;
+        this.socketFeedServer = new SocketFeedServer(configuration, outputtype, port);
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter writer) throws Exception {
+        super.start(partition, writer);
+    }
+
+    @Override
+    public InputStream getInputStream(int partition) throws IOException {
+        return socketFeedServer.getInputStream();
+    }
+
+    private static class SocketFeedServer {
+        private ServerSocket serverSocket;
+        private InputStream inputStream;
+
+        public SocketFeedServer(Map<String, Object> configuration, ARecordType outputtype, int port)
+                throws IOException, AsterixException {
+            try {
+                serverSocket = new ServerSocket(port);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("port: " + port + " unusable ");
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Feed server configured to use port: " + port);
+            }
+        }
+
+        public InputStream getInputStream() {
+            Socket socket;
+            try {
+                socket = serverSocket.accept();
+                inputStream = socket.getInputStream();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            return inputStream;
+        }
+
+        public void stop() throws IOException {
+            serverSocket.close();
+        }
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+        socketFeedServer.stop();
+    }
+
+    @Override
+    public void alter(Map<String, Object> properties) {
+        // TODO Auto-generated method stub
+
+    }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
new file mode 100644
index 0000000..961e519
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -0,0 +1,79 @@
+/*
+x * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * Factory class for creating @see{GenericSocketFeedAdapter} The
+ * adapter listens at a port for receiving data (from external world).
+ * Data received is transformed into Asterix Data Format (ADM) and stored into
+ * a dataset a configured in the Adapter configuration.
+ */
+public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    private ARecordType outputType;
+
+    @Override
+    public String getName() {
+        return "generic_socket_feed";
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.GENERIC;
+    }
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configuration) throws Exception {
+        this.configuration = configuration;
+        outputType = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+        this.configureFormat(outputType);
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return outputType;
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 16812d9..dbe5d9a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -19,7 +19,7 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
@@ -47,7 +47,7 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class RateControlledFileSystemBasedAdapterFactory extends FileSystemAdapterFactory {
+public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory {
     private static final long serialVersionUID = 1L;
 
     public static final String KEY_FILE_SYSTEM = "fs";
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 5ba983e..0e58ed2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -115,12 +115,12 @@
 
         private void writeTweet(TweetMessage next) {
 
-            //tweet id
+            // tweet id
             LOGGER.info("Generating next tweet");
             ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
             mutableRecord.setValueAtPos(0, mutableFields[0]);
 
-            // user 
+            // user
             AMutableRecord userRecord = ((AMutableRecord) mutableFields[1]);
             ((AMutableString) userRecord.getValueByPos(0)).setValue(next.getUser().getScreenName());
             ((AMutableString) userRecord.getValueByPos(1)).setValue("en");
@@ -171,12 +171,10 @@
 
         @Override
         public InflowState setNextRecord() throws Exception {
-            LOGGER.info("requesting next tweet");
             boolean moreData = tweetIterator.hasNext();
             if (!moreData) {
                 return InflowState.NO_MORE_DATA;
-            } 
-            LOGGER.info("writing next tweet");
+            }
             writeTweet(tweetIterator.next());
             if (tweetInterval != 0) {
                 tweetCount++;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 92dc394..afb57bf 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -57,7 +57,7 @@
 
     @Override
     public String getName() {
-        return "synthetic_twitter_feed";
+        return "pull_twitter_feed";
     }
 
     @Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 7b17923..9cae374 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -5,6 +5,8 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -100,14 +102,15 @@
         String tweet = next.toString();
         os.write(tweet.getBytes());
         os.write(EOL);
-        LOGGER.info(tweet);
+        /*
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(tweet);
+        }*/
     }
 
     private void writeTweetRecord(TweetMessage next) {
 
         //tweet id
-        LOGGER.info("Generating next tweet");
-
         ((AMutableString) mutableFields[0]).setValue(next.getTweetid());
         mutableRecord.setValueAtPos(0, mutableFields[0]);
 
@@ -162,12 +165,10 @@
 
     @Override
     public InflowState setNextRecord() throws Exception {
-        LOGGER.info("requesting next tweet");
         boolean moreData = tweetIterator.hasNext();
         if (!moreData) {
             return InflowState.NO_MORE_DATA;
         }
-        LOGGER.info("writing next tweet");
         TweetMessage msg = tweetIterator.next();
         if (isOutputFormatRecord) {
             writeTweetRecord(msg);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index eef551d..c8fa860 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -7,6 +7,7 @@
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -47,8 +48,7 @@
         this.twitterFeedClient = new TweetGenerator(configuration, outputtype, 0,
                 TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
         this.twitterServer = new TwitterServer(configuration, outputtype);
-        this.twitterClient = new TwitterClient(PORT);
-
+        this.twitterClient = new TwitterClient(twitterServer.getPort());
     }
 
     @Override
@@ -64,12 +64,27 @@
     }
 
     private static class TwitterServer {
-        private final ServerSocket serverSocket;
+        private ServerSocket serverSocket;
         private final Listener listener;
+        private int port = -1;
 
         public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
                 AsterixException {
-            serverSocket = new ServerSocket(PORT);
+            int numAttempts = 0;
+            while (port < 0) {
+                try {
+                    serverSocket = new ServerSocket(PORT + numAttempts);
+                    port = PORT + numAttempts;
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("port: " + (PORT + numAttempts) + " unusable ");
+                    }
+                    numAttempts++;
+                }
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Twitter server configured to use port: " + port);
+            }
             listener = new Listener(serverSocket, configuration, outputtype);
         }
 
@@ -78,6 +93,14 @@
             t.start();
         }
 
+        public void stop() {
+            listener.stop();
+        }
+
+        public int getPort() {
+            return port;
+        }
+
     }
 
     private static class TwitterClient {
@@ -103,6 +126,7 @@
         private final ServerSocket serverSocket;
         private Socket socket;
         private TweetGenerator tweetGenerator;
+        private boolean continuePush = true;
 
         public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
                 throws IOException, AsterixException {
@@ -119,7 +143,7 @@
                     socket = serverSocket.accept();
                     OutputStream os = socket.getOutputStream();
                     tweetGenerator.setOutputStream(os);
-                    while (state.equals(InflowState.DATA_AVAILABLE)) {
+                    while (state.equals(InflowState.DATA_AVAILABLE) && continuePush) {
                         state = tweetGenerator.setNextRecord();
                     }
                     os.close();
@@ -137,12 +161,15 @@
             }
         }
 
+        public void stop() {
+            continuePush = false;
+        }
+
     }
 
     @Override
     public void stop() throws Exception {
-        // TODO Auto-generated method stub
-
+        twitterServer.stop();
     }
 
     @Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 11c9172..9500436 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -20,7 +20,7 @@
 import java.util.Set;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.FileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -43,7 +43,7 @@
  * on the local file system or on HDFS. The feed ends when the content of the
  * source file has been ingested.
  */
-public class TwitterFirehoseFeedAdapterFactory extends FileSystemAdapterFactory implements ITypedAdapterFactory {
+public class TwitterFirehoseFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
 
     /**
      * 
@@ -51,12 +51,13 @@
     private static final long serialVersionUID = 1L;
 
     private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
+    private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
 
     private static final ARecordType outputType = initOutputType();
 
     @Override
     public String getName() {
-        return "twitter_firehose_feed";
+        return "twitter_firehose";
     }
 
     @Override
@@ -96,14 +97,31 @@
         }
         List<String> storageNodes = ng.getNodeNames();
         Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
-        String ingestionLocation = null;
         if (nodes.size() > storageNodes.size()) {
             nodes.removeAll(storageNodes);
         }
+
+        String iCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
+        int iCardinality = iCardinalityParam != null ? Integer.parseInt(iCardinalityParam) : 1;
+        String[] ingestionLocations = new String[iCardinality];
         String[] nodesArray = nodes.toArray(new String[] {});
-        Random r = new Random();
-        ingestionLocation = nodesArray[r.nextInt(nodes.size())];
-        return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
+        if (iCardinality > nodes.size()) {
+            for (int i = 0; i < nodesArray.length; i++) {
+                ingestionLocations[i] = nodesArray[i];
+            }
+
+            for (int j = nodesArray.length, k = 0; j < iCardinality; j++, k++) {
+                ingestionLocations[j] = storageNodes.get(k);
+            }
+        } else {
+            Random r = new Random();
+            int ingestLocIndex = r.nextInt(nodes.size());
+            ingestionLocations[0] = nodesArray[ingestLocIndex];
+            for (int i = 1; i < iCardinality; i++) {
+                ingestionLocations[i] = nodesArray[(ingestLocIndex + i) % nodesArray.length];
+            }
+        }
+        return new AlgebricksAbsolutePartitionConstraint(ingestionLocations);
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 745abb3..5e380ff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -121,7 +121,8 @@
                 // indicates an absence of logs any further.
             }
 
-            if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+            //if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+            if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
                 return next(currentLogLocator); //should read from memory if there is any further log
             }
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index e67f0164..b8d5312 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -591,7 +591,7 @@
 
     public boolean isMemoryRead(long currentLSN) {
         long flushLSN = lastFlushedLSN.get();
-        if ((flushLSN + 1) % logPageSize == 0) {
+        if ((flushLSN + 1) == currentLSN) {
             return false;
         }
         long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);